You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/08/20 17:10:18 UTC
[04/10] git commit: Make StreamReceiveTask thread safe and gc friendly
Make StreamReceiveTask thread safe and gc friendly
patch by yukim; reviewed by benedict for CASSANDRA-7795
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4fc417c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4fc417c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4fc417c4
Branch: refs/heads/trunk
Commit: 4fc417c404aab0713a7d9747d22ce7eceb777859
Parents: eeb0d4c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Aug 19 13:31:30 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 20 09:49:39 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/streaming/StreamReceiveTask.java | 56 ++++++++++++--------
2 files changed, 35 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4fc417c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b2930e..fe9f4e0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,7 @@
* Track max/min timestamps for range tombstones (CASSANDRA-7647)
* Fix NPE when listing saved caches dir (CASSANDRA-7632)
* Fix sstableloader unable to connect encrypted node (CASSANDRA-7585)
+ * Make StreamReceiveTask thread safe and gc friendly (CASSANDRA-7795)
Merged from 1.2:
* Validate empty cell names from counter updates (CASSANDRA-7798)
* Improve PasswordAuthenticator default super user setup (CASSANDRA-7788)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4fc417c4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 9a2568d..223a46e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -21,13 +21,16 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
/**
@@ -35,11 +38,17 @@ import org.apache.cassandra.utils.Pair;
*/
public class StreamReceiveTask extends StreamTask
{
+ private static final ThreadPoolExecutor executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("StreamReceiveTask",
+ FBUtilities.getAvailableProcessors(),
+ 60, TimeUnit.SECONDS);
+
// number of files to receive
private final int totalFiles;
// total size of files to receive
private final long totalSize;
- private volatile boolean aborted;
+
+ // true if task is done (either completed or aborted)
+ private boolean done = false;
// holds references to SSTables received
protected Collection<SSTableWriter> sstables;
@@ -57,14 +66,19 @@ public class StreamReceiveTask extends StreamTask
*
* @param sstable SSTable file received.
*/
- public void received(SSTableWriter sstable)
+ public synchronized void received(SSTableWriter sstable)
{
+ if (done)
+ return;
+
assert cfId.equals(sstable.metadata.cfId);
- assert !aborted;
sstables.add(sstable);
if (sstables.size() == totalFiles)
- complete();
+ {
+ done = true;
+ executor.submit(new OnCompletionRunnable(this));
+ }
}
public int getTotalNumberOfFiles()
@@ -77,12 +91,6 @@ public class StreamReceiveTask extends StreamTask
return totalSize;
}
- private void complete()
- {
- if (!sstables.isEmpty())
- StorageService.tasks.submit(new OnCompletionRunnable(this));
- }
-
private static class OnCompletionRunnable implements Runnable
{
private final StreamReceiveTask task;
@@ -103,6 +111,7 @@ public class StreamReceiveTask extends StreamTask
for (SSTableWriter writer : task.sstables)
readers.add(writer.closeAndOpenReader());
lockfile.delete();
+ task.sstables.clear();
if (!SSTableReader.acquireReferences(readers))
throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
@@ -121,17 +130,20 @@ public class StreamReceiveTask extends StreamTask
}
}
- public void abort()
+ /**
+ * Abort this task.
+ * If the task already received all files and
+ * {@link org.apache.cassandra.streaming.StreamReceiveTask.OnCompletionRunnable} task is submitted,
+ * then task cannot be aborted.
+ */
+ public synchronized void abort()
{
- aborted = true;
- Runnable r = new Runnable()
- {
- public void run()
- {
- for (SSTableWriter writer : sstables)
- writer.abort();
- }
- };
- StorageService.tasks.submit(r);
+ if (done)
+ return;
+
+ done = true;
+ for (SSTableWriter writer : sstables)
+ writer.abort();
+ sstables.clear();
}
}