You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/06/18 05:02:15 UTC

[3/6] cassandra git commit: Make rebuild only run one at a time

Make rebuild only run one at a time

patch by yukim; reviewed by jmckenzie for CASSANDRA-9119


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9966419d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9966419d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9966419d

Branch: refs/heads/trunk
Commit: 9966419dbda995421f41ccc769d3b89d63940c82
Parents: 4c15970
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jun 3 14:44:11 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jun 17 20:41:16 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/service/StorageService.java       | 36 ++++++++++++++------
 2 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9966419d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f3f9f0..1d72c9a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Merged from 2.0
  * ArrivalWindow should use primitives (CASSANDRA-9496)
  * Periodically submit background compaction tasks (CASSANDRA-9592)
  * Set HAS_MORE_PAGES flag to false when PagingState is null (CASSANDRA-9571)
+ * Make rebuild only run one at a time (CASSANDRA-9119)
 
 
 2.1.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9966419d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7c8e424..e063c63 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -237,7 +238,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private boolean isBootstrapMode;
 
     /* we bootstrap but do NOT join the ring unless told to do so */
-    private boolean isSurveyMode= Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false"));
+    private boolean isSurveyMode = Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false"));
+    /* true if node is rebuilding and receiving data */
+    private final AtomicBoolean isRebuilding = new AtomicBoolean();
 
     /* when intialized as a client, we shouldn't write to the system keyspace. */
     private boolean isClientMode;
@@ -1023,19 +1026,27 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public void rebuild(String sourceDc)
     {
-        logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
-
-        RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild");
-        streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
-        if (sourceDc != null)
-            streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
+        // check on going rebuild
+        if (!isRebuilding.compareAndSet(false, true))
+        {
+            throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats.");
+        }
 
-        for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
-            streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
+        logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
 
         try
         {
-            streamer.fetchAsync().get();
+            RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild");
+            streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
+            if (sourceDc != null)
+                streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
+
+            for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+                streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
+
+            StreamResultFuture resultFuture = streamer.fetchAsync();
+            // wait for result
+            resultFuture.get();
         }
         catch (InterruptedException e)
         {
@@ -1047,6 +1058,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             logger.error("Error while rebuilding node", e.getCause());
             throw new RuntimeException("Error while rebuilding node: " + e.getCause().getMessage());
         }
+        finally
+        {
+            // rebuild is done (successfully or not)
+            isRebuilding.set(false);
+        }
     }
 
     public void setStreamThroughputMbPerSec(int value)