You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/04/03 23:20:04 UTC
[2/6] git commit: Fix SSTable not released if stream session fails
Fix SSTable not released if stream session fails
patch by yukim; reviewed by Richard Low for CASSANDRA-6818
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/04fd84c3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/04fd84c3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/04fd84c3
Branch: refs/heads/cassandra-2.1
Commit: 04fd84c3d536a82b1e20a6295bc09d9250b76d29
Parents: 123d5bc
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Apr 3 16:17:44 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Apr 3 16:17:44 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamSession.java | 8 +-
.../apache/cassandra/streaming/StreamTask.java | 6 ++
.../cassandra/streaming/StreamTransferTask.java | 61 +++++++++++++--
.../cassandra/streaming/StreamWriter.java | 3 -
.../compress/CompressedStreamWriter.java | 2 -
.../streaming/StreamTransferTaskTest.java | 80 ++++++++++++++++++++
7 files changed, 149 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3200941..38a6c3c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,7 @@
* Fix map element access in IF (CASSANDRA-6914)
* Avoid costly range calculations for range queries on system keyspaces
(CASSANDRA-6906)
+ * Fix SSTable not released if stream session fails (CASSANDRA-6818)
Merged from 1.2:
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
* add extra SSL cipher suites (CASSANDRA-6613)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 7972183..449751d 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -21,7 +21,9 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.slf4j.Logger;
@@ -316,8 +318,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
if (finalState == State.FAILED)
{
- for (StreamReceiveTask srt : receivers.values())
- srt.abort();
+ for (StreamTask task : Iterables.concat(receivers.values(), transfers.values()))
+ task.abort();
}
// Note that we shouldn't block on this close because this method is called on the handler
@@ -459,6 +461,8 @@ public class StreamSession implements IEndpointStateChangeSubscriber, IFailureDe
long headerSize = header.size();
StreamingMetrics.totalOutgoingBytes.inc(headerSize);
metrics.outgoingBytes.inc(headerSize);
+ // schedule timeout for receiving ACK
+ transfers.get(header.cfId).scheduleTimeout(header.sequenceNumber, 12, TimeUnit.HOURS);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTask.java b/src/java/org/apache/cassandra/streaming/StreamTask.java
index 9e9e06f..ac72cff 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTask.java
@@ -46,6 +46,12 @@ public abstract class StreamTask
public abstract long getTotalSize();
/**
+ * Abort the task.
+ * Subclass should implement cleaning up resources.
+ */
+ public abstract void abort();
+
+ /**
* @return StreamSummary that describes this task
*/
public StreamSummary getSummary()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 8e461cc..86f4ee2 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.streaming;
import java.util.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.io.sstable.SSTableReader;
@@ -29,9 +30,13 @@ import org.apache.cassandra.utils.Pair;
*/
public class StreamTransferTask extends StreamTask
{
+ private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+
private final AtomicInteger sequenceNumber = new AtomicInteger(0);
- private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+ private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>();
+
+ private final Map<Integer, ScheduledFuture> timeoutTasks = new ConcurrentHashMap<>();
private long totalSize;
@@ -55,10 +60,26 @@ public class StreamTransferTask extends StreamTask
*/
public void complete(int sequenceNumber)
{
- files.remove(sequenceNumber);
- // all file sent, notify session this task is complete.
- if (files.isEmpty())
- session.taskCompleted(this);
+ OutgoingFileMessage file = files.remove(sequenceNumber);
+ if (file != null)
+ {
+ file.sstable.releaseReference();
+ // all file sent, notify session this task is complete.
+ if (files.isEmpty())
+ {
+ timeoutExecutor.shutdownNow();
+ session.taskCompleted(this);
+ }
+ }
+ }
+
+ public void abort()
+ {
+ for (OutgoingFileMessage file : files.values())
+ {
+ file.sstable.releaseReference();
+ }
+ timeoutExecutor.shutdownNow();
}
public int getTotalNumberOfFiles()
@@ -80,6 +101,36 @@ public class StreamTransferTask extends StreamTask
public OutgoingFileMessage createMessageForRetry(int sequenceNumber)
{
+ // remove previous time out task to be rescheduled later
+ ScheduledFuture future = timeoutTasks.get(sequenceNumber);
+ future.cancel(false);
return files.get(sequenceNumber);
}
+
+ /**
+ * Schedule timeout task to release reference for file sent.
+ * When not receiving ACK after sending to receiver in given time,
+ * the task will release reference.
+ *
+ * @param sequenceNumber sequence number of file sent.
+ * @param time time to timeout
+ * @param unit unit of given time
+ * @return scheduled future for timeout task
+ */
+ public ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit)
+ {
+ if (timeoutExecutor.isShutdown())
+ return null;
+
+ ScheduledFuture future = timeoutExecutor.schedule(new Runnable()
+ {
+ public void run()
+ {
+ StreamTransferTask.this.complete(sequenceNumber);
+ timeoutTasks.remove(sequenceNumber);
+ }
+ }, time, unit);
+ timeoutTasks.put(sequenceNumber, future);
+ return future;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamWriter.java b/src/java/org/apache/cassandra/streaming/StreamWriter.java
index dbc7390..5609f20 100644
--- a/src/java/org/apache/cassandra/streaming/StreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/StreamWriter.java
@@ -115,9 +115,6 @@ public class StreamWriter
FileUtils.closeQuietly(file);
FileUtils.closeQuietly(validator);
}
-
- // release reference only when completed successfully
- sstable.releaseReference();
}
protected long totalSize()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 80fcef5..001c927 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@ -83,8 +83,6 @@ public class CompressedStreamWriter extends StreamWriter
// no matter what happens close file
FileUtils.closeQuietly(file);
}
-
- sstable.releaseReference();
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/04fd84c3/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
new file mode 100644
index 0000000..9b02817
--- /dev/null
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class StreamTransferTaskTest extends SchemaLoader
+{
+ @Test
+ public void testScheduleTimeout() throws Exception
+ {
+ String ks = "Keyspace1";
+ String cf = "Standard1";
+
+ StreamSession session = new StreamSession(FBUtilities.getBroadcastAddress());
+ ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
+
+ // create two sstables
+ for (int i = 0; i < 2; i++)
+ {
+ insertData(ks, cf, i, 1);
+ cfs.forceBlockingFlush();
+ }
+
+ // create streaming task that streams those two sstables
+ StreamTransferTask task = new StreamTransferTask(session, cfs.metadata.cfId);
+ for (SSTableReader sstable : cfs.getSSTables())
+ {
+ List<Range<Token>> ranges = new ArrayList<>();
+ ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
+ task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges));
+ }
+ assertEquals(2, task.getTotalNumberOfFiles());
+
+ // if file sending completes before timeout then the task should be canceled.
+ ScheduledFuture f = task.scheduleTimeout(0, 1, TimeUnit.SECONDS);
+ task.complete(0);
+ // timeout task may run after complete but it is noop
+ f.get();
+
+ // when timeout runs on second file, task should be completed
+ f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS);
+ f.get();
+ assertEquals(StreamSession.State.WAIT_COMPLETE, session.state());
+
+ // when all streaming are done, time out task should not be scheduled.
+ assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
+ }
+}