You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2014/08/20 17:10:18 UTC

[04/10] git commit: Make StreamReceiveTask thread safe and gc friendly

Make StreamReceiveTask thread safe and gc friendly

patch by yukim; reviewed by benedict for CASSANDRA-7795


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4fc417c4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4fc417c4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4fc417c4

Branch: refs/heads/trunk
Commit: 4fc417c404aab0713a7d9747d22ce7eceb777859
Parents: eeb0d4c
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Aug 19 13:31:30 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 20 09:49:39 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamReceiveTask.java  | 56 ++++++++++++--------
 2 files changed, 35 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4fc417c4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2b2930e..fe9f4e0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -60,6 +60,7 @@
  * Track max/min timestamps for range tombstones (CASSANDRA-7647)
  * Fix NPE when listing saved caches dir (CASSANDRA-7632)
  * Fix sstableloader unable to connect encrypted node (CASSANDRA-7585)
+ * Make StreamReceiveTask thread safe and gc friendly (CASSANDRA-7795)
 Merged from 1.2:
  * Validate empty cell names from counter updates (CASSANDRA-7798)
  * Improve PasswordAuthenticator default super user setup (CASSANDRA-7788)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4fc417c4/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 9a2568d..223a46e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -21,13 +21,16 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -35,11 +38,17 @@ import org.apache.cassandra.utils.Pair;
  */
 public class StreamReceiveTask extends StreamTask
 {
+    private static final ThreadPoolExecutor executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("StreamReceiveTask",
+                                                                                                              FBUtilities.getAvailableProcessors(),
+                                                                                                              60, TimeUnit.SECONDS);
+
     // number of files to receive
     private final int totalFiles;
     // total size of files to receive
     private final long totalSize;
-    private volatile boolean aborted;
+
+    // true if task is done (either completed or aborted)
+    private boolean done = false;
 
     //  holds references to SSTables received
     protected Collection<SSTableWriter> sstables;
@@ -57,14 +66,19 @@ public class StreamReceiveTask extends StreamTask
      *
      * @param sstable SSTable file received.
      */
-    public void received(SSTableWriter sstable)
+    public synchronized void received(SSTableWriter sstable)
     {
+        if (done)
+            return;
+
         assert cfId.equals(sstable.metadata.cfId);
-        assert !aborted;
 
         sstables.add(sstable);
         if (sstables.size() == totalFiles)
-            complete();
+        {
+            done = true;
+            executor.submit(new OnCompletionRunnable(this));
+        }
     }
 
     public int getTotalNumberOfFiles()
@@ -77,12 +91,6 @@ public class StreamReceiveTask extends StreamTask
         return totalSize;
     }
 
-    private void complete()
-    {
-        if (!sstables.isEmpty())
-            StorageService.tasks.submit(new OnCompletionRunnable(this));
-    }
-
     private static class OnCompletionRunnable implements Runnable
     {
         private final StreamReceiveTask task;
@@ -103,6 +111,7 @@ public class StreamReceiveTask extends StreamTask
             for (SSTableWriter writer : task.sstables)
                 readers.add(writer.closeAndOpenReader());
             lockfile.delete();
+            task.sstables.clear();
 
             if (!SSTableReader.acquireReferences(readers))
                 throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred");
@@ -121,17 +130,20 @@ public class StreamReceiveTask extends StreamTask
         }
     }
 
-    public void abort()
+    /**
+     * Abort this task.
+     * If the task already received all files and
+     * {@link org.apache.cassandra.streaming.StreamReceiveTask.OnCompletionRunnable} task is submitted,
+     * then task cannot be aborted.
+     */
+    public synchronized void abort()
     {
-        aborted = true;
-        Runnable r = new Runnable()
-        {
-            public void run()
-            {
-                for (SSTableWriter writer : sstables)
-                    writer.abort();
-            }
-        };
-        StorageService.tasks.submit(r);
+        if (done)
+            return;
+
+        done = true;
+        for (SSTableWriter writer : sstables)
+            writer.abort();
+        sstables.clear();
     }
 }