You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2013/11/05 06:39:58 UTC
[2/4] git commit: Make calculatePendingRanges asynchronous Patch by
Ryan Fowler, reviewed by brandonwilliams for CASSANDRA-6244
Make calculatePendingRanges asynchronous
Patch by Ryan Fowler, reviewed by brandonwilliams for CASSANDRA-6244
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c28fe781
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c28fe781
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c28fe781
Branch: refs/heads/trunk
Commit: c28fe781606f92e0e3005a3901ff97f0c8633e9a
Parents: e0eb517
Author: Brandon Williams <br...@apache.org>
Authored: Mon Nov 4 23:24:13 2013 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Mon Nov 4 23:24:13 2013 -0600
----------------------------------------------------------------------
.../service/PendingRangeCalculatorService.java | 207 +++++++++++++++++++
.../PendingRangeCalculatorServiceMBean.java | 23 +++
.../cassandra/service/StorageService.java | 149 ++-----------
.../cassandra/locator/SimpleStrategyTest.java | 3 +-
4 files changed, 251 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c28fe781/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
new file mode 100644
index 0000000..ed970fb
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import org.apache.cassandra.utils.BiMultiValMap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.AbstractReplicationStrategy;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.Collection;
+import java.util.concurrent.*;
+
+
+public class PendingRangeCalculatorService extends PendingRangeCalculatorServiceMBean
+{
+ public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
+
+ private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+ private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
+
+ public PendingRangeCalculatorService()
+ {
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ }
+
+ private class PendingRangeTask implements Runnable
+ {
+ public void run()
+ {
+ long start = System.currentTimeMillis();
+ for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+ {
+ calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
+ }
+ logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start);
+ }
+ }
+
+ public Future<?> update()
+ {
+ return executor.submit(new PendingRangeTask());
+ }
+
+ public void blockUntilFinished()
+ {
+ while (true)
+ {
+ if (executor.getActiveCount() + executor.getPendingTasks() == 0)
+ break;
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
+ *
+ * (1) When in doubt, it is better to write too much to a node than too little. That is, if
+ * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning
+ * up unneeded data afterwards is better than missing writes during movement.
+ * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional
+ * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore
+ * we will first remove _all_ leaving tokens for the sake of calculation and then check what
+ * ranges would go where if all nodes are to leave. This way we get the biggest possible
+ * ranges with regard current leave operations, covering all subsets of possible final range
+ * values.
+ * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing
+ * complex calculations to see if multiple bootstraps overlap, we simply base calculations
+ * on the same token ring used before (reflecting situation after all leave operations have
+ * completed). Bootstrapping nodes will be added and removed one by one to that metadata and
+ * checked what their ranges would be. This will give us the biggest possible ranges the
+ * node could have. It might be that other bootstraps make our actual final ranges smaller,
+ * but it does not matter as we can clean up the data afterwards.
+ *
+ * NOTE: This is heavy and ineffective operation. This will be done only once when a node
+ * changes state in the cluster, so it should be manageable.
+ */
+ // public & static for testing purposes
+ public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
+ {
+ TokenMetadata tm = StorageService.instance.getTokenMetadata();
+ Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create();
+ BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
+ Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
+
+ if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty())
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", keyspaceName);
+ tm.setPendingRanges(keyspaceName, pendingRanges);
+ return;
+ }
+
+ Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
+
+ // Copy of metadata reflecting the situation after all leave operations are finished.
+ TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
+
+ // get all ranges that will be affected by leaving nodes
+ Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+ for (InetAddress endpoint : leavingEndpoints)
+ affectedRanges.addAll(addressRanges.get(endpoint));
+
+ // for each of those ranges, find what new nodes will be responsible for the range when
+ // all leaving nodes are gone.
+ for (Range<Token> range : affectedRanges)
+ {
+ Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm.cloneOnlyTokenMap()));
+ Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints));
+ }
+
+ // At this stage pendingRanges has been updated according to leave operations. We can
+ // now continue the calculation by checking bootstrapping nodes.
+
+ // For each of the bootstrapping nodes, simply add and remove them one by one to
+ // allLeftMetadata and check in between what their ranges would be.
+ for (InetAddress endpoint : bootstrapTokens.inverse().keySet())
+ {
+ Collection<Token> tokens = bootstrapTokens.inverse().get(endpoint);
+
+ allLeftMetadata.updateNormalTokens(tokens, endpoint);
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ pendingRanges.put(range, endpoint);
+ allLeftMetadata.removeEndpoint(endpoint);
+ }
+
+ // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes.
+ // We can now finish the calculation by checking moving and relocating nodes.
+
+ // For each of the moving nodes, we do the same thing we did for bootstrapping:
+ // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
+ for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints())
+ {
+ InetAddress endpoint = moving.right; // address of the moving node
+
+ // moving.left is a new token of the endpoint
+ allLeftMetadata.updateNormalToken(moving.left, endpoint);
+
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ {
+ pendingRanges.put(range, endpoint);
+ }
+
+ allLeftMetadata.removeEndpoint(endpoint);
+ }
+
+ // Ranges being relocated.
+ for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet())
+ {
+ InetAddress endpoint = relocating.getValue(); // address of the moving node
+ Token token = relocating.getKey();
+
+ allLeftMetadata.updateNormalToken(token, endpoint);
+
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ pendingRanges.put(range, endpoint);
+
+ allLeftMetadata.removeEndpoint(endpoint);
+ }
+
+ tm.setPendingRanges(keyspaceName, pendingRanges);
+
+ if (logger.isDebugEnabled())
+ logger.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c28fe781/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java
new file mode 100644
index 0000000..c9b04f0
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorServiceMBean.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+public class PendingRangeCalculatorServiceMBean
+{
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c28fe781/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index fec8290..50419c4 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -674,6 +674,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
+ setMode(Mode.JOINING, "waiting for pending range calculation", true);
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+ setMode(Mode.JOINING, "calculation complete, ready to bootstrap", true);
if (logger.isDebugEnabled())
@@ -1336,7 +1339,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
tokenMetadata.addBootstrapTokens(tokens, endpoint);
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
if (Gossiper.instance.usesHostId(endpoint))
tokenMetadata.updateHostId(Gossiper.instance.getHostId(endpoint), endpoint);
@@ -1478,7 +1481,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
}
/**
@@ -1513,7 +1516,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// at this point the endpoint is certainly a member with this token, so let's proceed
// normally
tokenMetadata.addLeavingEndpoint(endpoint);
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
}
/**
@@ -1550,7 +1553,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
tokenMetadata.addMovingEndpoint(token, endpoint);
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
}
/**
@@ -1570,7 +1573,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logger.debug("Tokens {} are relocating to {}", tokens, endpoint);
tokenMetadata.addRelocatingTokens(tokens, endpoint);
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
}
/**
@@ -1612,7 +1615,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// Note that the endpoint is being removed
tokenMetadata.addLeavingEndpoint(endpoint);
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
// find the endpoint coordinating this removal that we need to notify when we're done
String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
@@ -1635,12 +1638,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
removeEndpoint(endpoint);
tokenMetadata.removeEndpoint(endpoint);
tokenMetadata.removeBootstrapTokens(tokens);
+
if (!isClientMode)
{
for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
subscriber.onLeaveCluster(endpoint);
}
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
}
private void excise(Collection<Token> tokens, InetAddress endpoint, long expireTime)
@@ -1671,125 +1675,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
/**
- * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
- *
- * (1) When in doubt, it is better to write too much to a node than too little. That is, if
- * there are multiple nodes moving, calculate the biggest ranges a node could have. Cleaning
- * up unneeded data afterwards is better than missing writes during movement.
- * (2) When a node leaves, ranges for other nodes can only grow (a node might get additional
- * ranges, but it will not lose any of its current ranges as a result of a leave). Therefore
- * we will first remove _all_ leaving tokens for the sake of calculation and then check what
- * ranges would go where if all nodes are to leave. This way we get the biggest possible
- * ranges with regard current leave operations, covering all subsets of possible final range
- * values.
- * (3) When a node bootstraps, ranges of other nodes can only get smaller. Without doing
- * complex calculations to see if multiple bootstraps overlap, we simply base calculations
- * on the same token ring used before (reflecting situation after all leave operations have
- * completed). Bootstrapping nodes will be added and removed one by one to that metadata and
- * checked what their ranges would be. This will give us the biggest possible ranges the
- * node could have. It might be that other bootstraps make our actual final ranges smaller,
- * but it does not matter as we can clean up the data afterwards.
- *
- * NOTE: This is heavy and ineffective operation. This will be done only once when a node
- * changes state in the cluster, so it should be manageable.
- */
- private void calculatePendingRanges()
- {
- for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
- calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
- }
-
- // public & static for testing purposes
- public static void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
- {
- TokenMetadata tm = StorageService.instance.getTokenMetadata();
- Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create();
- BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
- Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
-
- if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty() && tm.getRelocatingRanges().isEmpty())
- {
- if (logger.isDebugEnabled())
- logger.debug("No bootstrapping, leaving or moving nodes, and no relocating tokens -> empty pending ranges for {}", keyspaceName);
- tm.setPendingRanges(keyspaceName, pendingRanges);
- return;
- }
-
- Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
-
- // Copy of metadata reflecting the situation after all leave operations are finished.
- TokenMetadata allLeftMetadata = tm.cloneAfterAllLeft();
-
- // get all ranges that will be affected by leaving nodes
- Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
- for (InetAddress endpoint : leavingEndpoints)
- affectedRanges.addAll(addressRanges.get(endpoint));
-
- // for each of those ranges, find what new nodes will be responsible for the range when
- // all leaving nodes are gone.
- for (Range<Token> range : affectedRanges)
- {
- Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, tm.cloneOnlyTokenMap()));
- Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
- pendingRanges.putAll(range, Sets.difference(newEndpoints, currentEndpoints));
- }
-
- // At this stage pendingRanges has been updated according to leave operations. We can
- // now continue the calculation by checking bootstrapping nodes.
-
- // For each of the bootstrapping nodes, simply add and remove them one by one to
- // allLeftMetadata and check in between what their ranges would be.
- for (InetAddress endpoint : bootstrapTokens.inverse().keySet())
- {
- Collection<Token> tokens = bootstrapTokens.inverse().get(endpoint);
-
- allLeftMetadata.updateNormalTokens(tokens, endpoint);
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- pendingRanges.put(range, endpoint);
- allLeftMetadata.removeEndpoint(endpoint);
- }
-
- // At this stage pendingRanges has been updated according to leaving and bootstrapping nodes.
- // We can now finish the calculation by checking moving and relocating nodes.
-
- // For each of the moving nodes, we do the same thing we did for bootstrapping:
- // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
- for (Pair<Token, InetAddress> moving : tm.getMovingEndpoints())
- {
- InetAddress endpoint = moving.right; // address of the moving node
-
- // moving.left is a new token of the endpoint
- allLeftMetadata.updateNormalToken(moving.left, endpoint);
-
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- {
- pendingRanges.put(range, endpoint);
- }
-
- allLeftMetadata.removeEndpoint(endpoint);
- }
-
- // Ranges being relocated.
- for (Map.Entry<Token, InetAddress> relocating : tm.getRelocatingRanges().entrySet())
- {
- InetAddress endpoint = relocating.getValue(); // address of the moving node
- Token token = relocating.getKey();
-
- allLeftMetadata.updateNormalToken(token, endpoint);
-
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- pendingRanges.put(range, endpoint);
-
- allLeftMetadata.removeEndpoint(endpoint);
- }
-
- tm.setPendingRanges(keyspaceName, pendingRanges);
-
- if (logger.isDebugEnabled())
- logger.debug("Pending ranges:\n" + (pendingRanges.isEmpty() ? "<empty>" : tm.printPendingRanges()));
- }
-
- /**
* Finds living endpoints responsible for the given ranges
*
* @param keyspaceName the keyspace ranges belong to
@@ -1985,7 +1870,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
public void onRemove(InetAddress endpoint)
{
tokenMetadata.removeEndpoint(endpoint);
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
}
public void onDead(InetAddress endpoint, EndpointState state)
@@ -2727,7 +2612,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalTokens()));
tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddress());
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
}
public void decommission() throws InterruptedException
@@ -2736,6 +2621,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
throw new UnsupportedOperationException("local node is not a member of the token ring yet");
if (tokenMetadata.cloneAfterAllLeft().sortedTokens().size() < 2)
throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
+
+ PendingRangeCalculatorService.instance.blockUntilFinished();
for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
{
if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0)
@@ -2767,7 +2654,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.NEEDS_BOOTSTRAP);
tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddress());
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime()));
int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2);
@@ -2888,6 +2775,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
List<String> keyspacesToProcess = Schema.instance.getNonSystemKeyspaces();
+ PendingRangeCalculatorService.instance.blockUntilFinished();
// checking if data is moving to this node
for (String keyspaceName : keyspacesToProcess)
{
@@ -3185,7 +3073,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
removingNode = endpoint;
tokenMetadata.addLeavingEndpoint(endpoint);
- calculatePendingRanges();
+ PendingRangeCalculatorService.instance.update();
+
// the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
// we add our own token so other nodes to let us know when they're done
Gossiper.instance.advertiseRemoving(endpoint, hostId, localHostId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c28fe781/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
index 2656642..67b80f2 100644
--- a/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/SimpleStrategyTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -139,7 +140,7 @@ public class SimpleStrategyTest extends SchemaLoader
{
strategy = getStrategy(keyspaceName, tmd);
- StorageService.calculatePendingRanges(strategy, keyspaceName);
+ PendingRangeCalculatorService.calculatePendingRanges(strategy, keyspaceName);
int replicationFactor = strategy.getReplicationFactor();