You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/01/08 01:29:01 UTC

[2/8] cassandra git commit: Fix race condition in StreamTransferTask that could lead to infinite loops and premature sstable deletion

Fix race condition in StreamTransferTask that could lead to
infinite loops and premature sstable deletion

patch by benedict; reviewed by yukim for CASSANDRA-7704


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eeaa3e01
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eeaa3e01
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eeaa3e01

Branch: refs/heads/trunk
Commit: eeaa3e01235c98421fecc46eaed877b207fb5a33
Parents: 8078a58
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jan 7 19:44:00 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 7 19:44:00 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../cassandra/streaming/StreamTransferTask.java | 73 ++++++++++++--------
 .../streaming/StreamTransferTaskTest.java       | 19 +++--
 3 files changed, 62 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eeaa3e01/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c1bb28c..9ccbf45 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.12:
+ * Fix race condition in StreamTransferTask that could lead to
+   infinite loops and premature sstable deletion (CASSANDRA-7704)
  * Add an extra version check to MigrationTask (CASSANDRA-8462)
  * Ensure SSTableWriter cleans up properly after failure (CASSANDRA-8499)
  * Increase bf true positive count on key cache hit (CASSANDRA-8525)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eeaa3e01/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index a543d01..5b75555 100644
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@ -19,8 +19,10 @@ package org.apache.cassandra.streaming;
 
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 import org.apache.cassandra.utils.Pair;
@@ -30,13 +32,13 @@ import org.apache.cassandra.utils.Pair;
  */
 public class StreamTransferTask extends StreamTask
 {
-    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
+    private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts"));
 
     private final AtomicInteger sequenceNumber = new AtomicInteger(0);
+    private boolean aborted = false;
 
-    private final Map<Integer, OutgoingFileMessage> files = new ConcurrentHashMap<>();
-
-    private final Map<Integer, ScheduledFuture> timeoutTasks = new ConcurrentHashMap<>();
+    private final Map<Integer, OutgoingFileMessage> files = new HashMap<>();
+    private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<>();
 
     private long totalSize;
 
@@ -45,7 +47,7 @@ public class StreamTransferTask extends StreamTask
         super(session, cfId);
     }
 
-    public void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
+    public synchronized void addTransferFile(SSTableReader sstable, long estimatedKeys, List<Pair<Long, Long>> sections)
     {
         assert sstable != null && cfId.equals(sstable.metadata.cfId);
         OutgoingFileMessage message = new OutgoingFileMessage(sstable, sequenceNumber.getAndIncrement(), estimatedKeys, sections);
@@ -58,31 +60,42 @@ public class StreamTransferTask extends StreamTask
      *
      * @param sequenceNumber sequence number of file
      */
-    public synchronized void complete(int sequenceNumber)
+    public void complete(int sequenceNumber)
     {
-        OutgoingFileMessage file = files.remove(sequenceNumber);
-        if (file != null)
+        boolean signalComplete;
+        synchronized (this)
         {
-            file.sstable.releaseReference();
-            // all file sent, notify session this task is complete.
-            if (files.isEmpty())
-            {
-                timeoutExecutor.shutdownNow();
-                session.taskCompleted(this);
-            }
+            ScheduledFuture timeout = timeoutTasks.remove(sequenceNumber);
+            if (timeout != null)
+                timeout.cancel(false);
+
+            OutgoingFileMessage file = files.remove(sequenceNumber);
+            if (file != null)
+                file.sstable.releaseReference();
+
+            signalComplete = files.isEmpty();
         }
+
+        // all file sent, notify session this task is complete.
+        if (signalComplete)
+            session.taskCompleted(this);
     }
 
-    public void abort()
+    public synchronized void abort()
     {
+        if (aborted)
+            return;
+        aborted = true;
+
+        for (ScheduledFuture future : timeoutTasks.values())
+            future.cancel(false);
+        timeoutTasks.clear();
+
         for (OutgoingFileMessage file : files.values())
-        {
             file.sstable.releaseReference();
-        }
-        timeoutExecutor.shutdownNow();
     }
 
-    public int getTotalNumberOfFiles()
+    public synchronized int getTotalNumberOfFiles()
     {
         return files.size();
     }
@@ -92,17 +105,17 @@ public class StreamTransferTask extends StreamTask
         return totalSize;
     }
 
-    public Collection<OutgoingFileMessage> getFileMessages()
+    public synchronized Collection<OutgoingFileMessage> getFileMessages()
     {
         // We may race between queuing all those messages and the completion of the completion of
-        // the first ones. So copy the values to avoid a ConcurrentModificationException
+        // the first ones. So copy tthe values to avoid a ConcurrentModificationException
         return new ArrayList<>(files.values());
     }
 
     public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber)
     {
         // remove previous time out task to be rescheduled later
-        ScheduledFuture future = timeoutTasks.get(sequenceNumber);
+        ScheduledFuture future = timeoutTasks.remove(sequenceNumber);
         if (future != null)
             future.cancel(false);
         return files.get(sequenceNumber);
@@ -120,18 +133,24 @@ public class StreamTransferTask extends StreamTask
      */
     public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit)
     {
-        if (timeoutExecutor.isShutdown())
+        if (!files.containsKey(sequenceNumber))
             return null;
 
         ScheduledFuture future = timeoutExecutor.schedule(new Runnable()
         {
             public void run()
             {
-                StreamTransferTask.this.complete(sequenceNumber);
-                timeoutTasks.remove(sequenceNumber);
+                synchronized (StreamTransferTask.this)
+                {
+                    // remove so we don't cancel ourselves
+                    timeoutTasks.remove(sequenceNumber);
+                    StreamTransferTask.this.complete(sequenceNumber);
+                }
             }
         }, time, unit);
-        timeoutTasks.put(sequenceNumber, future);
+
+        ScheduledFuture prev = timeoutTasks.put(sequenceNumber, future);
+        assert prev == null;
         return future;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eeaa3e01/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index b51f75b..1c28cbd 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -20,11 +20,14 @@ package org.apache.cassandra.streaming;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
+import junit.framework.Assert;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
@@ -61,19 +64,25 @@ public class StreamTransferTaskTest extends SchemaLoader
         {
             List<Range<Token>> ranges = new ArrayList<>();
             ranges.add(new Range<>(sstable.first.getToken(), sstable.last.getToken()));
-            task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges));
+            task.addTransferFile(sstable, 1, sstable.getPositionsForRanges(ranges), 0);
         }
         assertEquals(2, task.getTotalNumberOfFiles());
 
         // if file sending completes before timeout then the task should be canceled.
-        ScheduledFuture f = task.scheduleTimeout(0, 1, TimeUnit.SECONDS);
-        task.complete(0);
-        // timeout task may run after complete but it is noop
+        Future f = task.scheduleTimeout(0, 0, TimeUnit.NANOSECONDS);
         f.get();
 
         // when timeout runs on second file, task should be completed
         f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS);
-        f.get();
+        task.complete(1);
+        try
+        {
+            f.get();
+            Assert.assertTrue(false);
+        }
+        catch (CancellationException ex)
+        {
+        }
         assertEquals(StreamSession.State.WAIT_COMPLETE, session.state());
 
         // when all streaming are done, time out task should not be scheduled.