You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/06/18 05:02:15 UTC
[3/6] cassandra git commit: Make rebuild only run one at a time
Make rebuild only run one at a time
patch by yukim; reviewed by jmckenzie for CASSANDRA-9119
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9966419d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9966419d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9966419d
Branch: refs/heads/trunk
Commit: 9966419dbda995421f41ccc769d3b89d63940c82
Parents: 4c15970
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jun 3 14:44:11 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jun 17 20:41:16 2015 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/StorageService.java | 36 ++++++++++++++------
2 files changed, 27 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9966419d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f3f9f0..1d72c9a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@ Merged from 2.0
* ArrivalWindow should use primitives (CASSANDRA-9496)
* Periodically submit background compaction tasks (CASSANDRA-9592)
* Set HAS_MORE_PAGES flag to false when PagingState is null (CASSANDRA-9571)
+ * Make rebuild only run one at a time (CASSANDRA-9119)
2.1.6
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9966419d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7c8e424..e063c63 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -237,7 +238,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private boolean isBootstrapMode;
/* we bootstrap but do NOT join the ring unless told to do so */
- private boolean isSurveyMode= Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false"));
+ private boolean isSurveyMode = Boolean.parseBoolean(System.getProperty("cassandra.write_survey", "false"));
+ /* true if node is rebuilding and receiving data */
+ private final AtomicBoolean isRebuilding = new AtomicBoolean();
/* when intialized as a client, we shouldn't write to the system keyspace. */
private boolean isClientMode;
@@ -1023,19 +1026,27 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void rebuild(String sourceDc)
{
- logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
-
- RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild");
- streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
- if (sourceDc != null)
- streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
+ // check on going rebuild
+ if (!isRebuilding.compareAndSet(false, true))
+ {
+ throw new IllegalStateException("Node is still rebuilding. Check nodetool netstats.");
+ }
- for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
- streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
+ logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc);
try
{
- streamer.fetchAsync().get();
+ RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild");
+ streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
+ if (sourceDc != null)
+ streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc));
+
+ for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+ streamer.addRanges(keyspaceName, getLocalRanges(keyspaceName));
+
+ StreamResultFuture resultFuture = streamer.fetchAsync();
+ // wait for result
+ resultFuture.get();
}
catch (InterruptedException e)
{
@@ -1047,6 +1058,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.error("Error while rebuilding node", e.getCause());
throw new RuntimeException("Error while rebuilding node: " + e.getCause().getMessage());
}
+ finally
+ {
+ // rebuild is done (successfully or not)
+ isRebuilding.set(false);
+ }
}
public void setStreamThroughputMbPerSec(int value)