You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/11/18 17:39:09 UTC
[04/10] cassandra git commit: Avoid blocking gossip during pending
range calculation
Avoid blocking gossip during pending range calculation
patch by Stefan Podkowinski; reviewed by Joel Knighton for CASSANDRA-12281
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9cd7d540
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9cd7d540
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9cd7d540
Branch: refs/heads/trunk
Commit: 9cd7d540de2ea525982d139a4c8a11233c4e98c9
Parents: 7d2fdfe
Author: Stefan Podkowinski <s....@gmail.com>
Authored: Fri Oct 21 11:34:53 2016 +0200
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Nov 18 17:29:14 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/locator/TokenMetadata.java | 211 +++++++++++--------
.../service/PendingRangeCalculatorService.java | 10 +-
.../gms/PendingRangeCalculatorServiceTest.java | 133 ++++++++++++
4 files changed, 267 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54dc4b5..5a2e0ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Avoid blocking gossip during pending range calculation (CASSANDRA-12281)
* Fix purgeability of tombstones with max timestamp (CASSANDRA-12792)
* Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index b06c9c8..aafd7f9 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -649,12 +649,7 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- TokenMetadata allLeftMetadata = cloneOnlyTokenMap();
-
- for (InetAddress endpoint : leavingEndpoints)
- allLeftMetadata.removeEndpoint(endpoint);
-
- return allLeftMetadata;
+ return removeEndpoints(cloneOnlyTokenMap(), leavingEndpoints);
}
finally
{
@@ -662,6 +657,14 @@ public class TokenMetadata
}
}
+ private static TokenMetadata removeEndpoints(TokenMetadata allLeftMetadata, Set<InetAddress> leavingEndpoints)
+ {
+ for (InetAddress endpoint : leavingEndpoints)
+ allLeftMetadata.removeEndpoint(endpoint);
+
+ return allLeftMetadata;
+ }
+
/**
* Create a copy of TokenMetadata with tokenToEndpointMap reflecting situation after all
* current leave, and move operations have finished.
@@ -787,118 +790,154 @@ public class TokenMetadata
*/
public void calculatePendingRanges(AbstractReplicationStrategy strategy, String keyspaceName)
{
- lock.readLock().lock();
- try
+ // avoid race between both branches - do not use a lock here as this will block any other unrelated operations!
+ synchronized (pendingRanges)
{
- PendingRangeMaps newPendingRanges = new PendingRangeMaps();
-
if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && movingEndpoints.isEmpty())
{
if (logger.isTraceEnabled())
logger.trace("No bootstrapping, leaving or moving nodes -> empty pending ranges for {}", keyspaceName);
- pendingRanges.put(keyspaceName, newPendingRanges);
- return;
+ pendingRanges.put(keyspaceName, new PendingRangeMaps());
}
+ else
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Starting pending range calculation for {}", keyspaceName);
- Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges();
-
- // Copy of metadata reflecting the situation after all leave operations are finished.
- TokenMetadata allLeftMetadata = cloneAfterAllLeft();
+ long startedAt = System.currentTimeMillis();
- // get all ranges that will be affected by leaving nodes
- Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
- for (InetAddress endpoint : leavingEndpoints)
- affectedRanges.addAll(addressRanges.get(endpoint));
+ // create clone of current state
+ BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
+ Set<InetAddress> leavingEndpoints = new HashSet<>();
+ Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
+ TokenMetadata metadata;
- // for each of those ranges, find what new nodes will be responsible for the range when
- // all leaving nodes are gone.
- TokenMetadata metadata = cloneOnlyTokenMap(); // don't do this in the loop! #7758
- for (Range<Token> range : affectedRanges)
- {
- Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
- Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
- for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
+ lock.readLock().lock();
+ try
+ {
+ bootstrapTokens.putAll(this.bootstrapTokens);
+ leavingEndpoints.addAll(this.leavingEndpoints);
+ movingEndpoints.addAll(this.movingEndpoints);
+ metadata = this.cloneOnlyTokenMap();
+ }
+ finally
{
- newPendingRanges.addPendingRange(range, address);
+ lock.readLock().unlock();
}
+
+ pendingRanges.put(keyspaceName, calculatePendingRanges(strategy, metadata, bootstrapTokens,
+ leavingEndpoints, movingEndpoints));
+ long took = System.currentTimeMillis() - startedAt;
+
+ if (logger.isDebugEnabled())
+ logger.debug("Pending range calculation for {} completed (took: {}ms)", keyspaceName, took);
+ if (logger.isTraceEnabled())
+ logger.trace("Calculated pending ranges for {}:\n{}", keyspaceName, (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
}
+ }
+ }
+
+ /**
+ * @see TokenMetadata#calculatePendingRanges(AbstractReplicationStrategy, String)
+ */
+ private static PendingRangeMaps calculatePendingRanges(AbstractReplicationStrategy strategy,
+ TokenMetadata metadata,
+ BiMultiValMap<Token, InetAddress> bootstrapTokens,
+ Set<InetAddress> leavingEndpoints,
+ Set<Pair<Token, InetAddress>> movingEndpoints)
+ {
+ PendingRangeMaps newPendingRanges = new PendingRangeMaps();
+
+ Multimap<InetAddress, Range<Token>> addressRanges = strategy.getAddressRanges(metadata);
- // At this stage newPendingRanges has been updated according to leave operations. We can
- // now continue the calculation by checking bootstrapping nodes.
+ // Copy of metadata reflecting the situation after all leave operations are finished.
+ TokenMetadata allLeftMetadata = removeEndpoints(metadata.cloneOnlyTokenMap(), leavingEndpoints);
- // For each of the bootstrapping nodes, simply add and remove them one by one to
- // allLeftMetadata and check in between what their ranges would be.
- Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
- for (InetAddress endpoint : bootstrapAddresses.keySet())
+ // get all ranges that will be affected by leaving nodes
+ Set<Range<Token>> affectedRanges = new HashSet<Range<Token>>();
+ for (InetAddress endpoint : leavingEndpoints)
+ affectedRanges.addAll(addressRanges.get(endpoint));
+
+ // for each of those ranges, find what new nodes will be responsible for the range when
+ // all leaving nodes are gone.
+ for (Range<Token> range : affectedRanges)
+ {
+ Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+ Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ for (InetAddress address : Sets.difference(newEndpoints, currentEndpoints))
{
- Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+ newPendingRanges.addPendingRange(range, address);
+ }
+ }
- allLeftMetadata.updateNormalTokens(tokens, endpoint);
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- {
- newPendingRanges.addPendingRange(range, endpoint);
- }
- allLeftMetadata.removeEndpoint(endpoint);
+ // At this stage newPendingRanges has been updated according to leave operations. We can
+ // now continue the calculation by checking bootstrapping nodes.
+
+ // For each of the bootstrapping nodes, simply add and remove them one by one to
+ // allLeftMetadata and check in between what their ranges would be.
+ Multimap<InetAddress, Token> bootstrapAddresses = bootstrapTokens.inverse();
+ for (InetAddress endpoint : bootstrapAddresses.keySet())
+ {
+ Collection<Token> tokens = bootstrapAddresses.get(endpoint);
+
+ allLeftMetadata.updateNormalTokens(tokens, endpoint);
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ {
+ newPendingRanges.addPendingRange(range, endpoint);
}
+ allLeftMetadata.removeEndpoint(endpoint);
+ }
- // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
- // We can now finish the calculation by checking moving nodes.
+ // At this stage newPendingRanges has been updated according to leaving and bootstrapping nodes.
+ // We can now finish the calculation by checking moving nodes.
- // For each of the moving nodes, we do the same thing we did for bootstrapping:
- // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
- for (Pair<Token, InetAddress> moving : movingEndpoints)
+ // For each of the moving nodes, we do the same thing we did for bootstrapping:
+ // simply add and remove them one by one to allLeftMetadata and check in between what their ranges would be.
+ for (Pair<Token, InetAddress> moving : movingEndpoints)
+ {
+ //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
+ Set<Range<Token>> moveAffectedRanges = new HashSet<>();
+ InetAddress endpoint = moving.right; // address of the moving node
+ //Add ranges before the move
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
{
- //Calculate all the ranges which will could be affected. This will include the ranges before and after the move.
- Set<Range<Token>> moveAffectedRanges = new HashSet<>();
- InetAddress endpoint = moving.right; // address of the moving node
- //Add ranges before the move
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- {
- moveAffectedRanges.add(range);
- }
+ moveAffectedRanges.add(range);
+ }
- allLeftMetadata.updateNormalToken(moving.left, endpoint);
- //Add ranges after the move
- for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
- {
- moveAffectedRanges.add(range);
- }
+ allLeftMetadata.updateNormalToken(moving.left, endpoint);
+ //Add ranges after the move
+ for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
+ {
+ moveAffectedRanges.add(range);
+ }
- for(Range<Token> range : moveAffectedRanges)
+ for(Range<Token> range : moveAffectedRanges)
+ {
+ Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
+ Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
+ Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
+ for(final InetAddress address : difference)
{
- Set<InetAddress> currentEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, metadata));
- Set<InetAddress> newEndpoints = ImmutableSet.copyOf(strategy.calculateNaturalEndpoints(range.right, allLeftMetadata));
- Set<InetAddress> difference = Sets.difference(newEndpoints, currentEndpoints);
- for(final InetAddress address : difference)
- {
- Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
- Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
- //We want to get rid of any ranges which the node is currently getting.
- newRanges.removeAll(oldRanges);
+ Collection<Range<Token>> newRanges = strategy.getAddressRanges(allLeftMetadata).get(address);
+ Collection<Range<Token>> oldRanges = strategy.getAddressRanges(metadata).get(address);
+ //We want to get rid of any ranges which the node is currently getting.
+ newRanges.removeAll(oldRanges);
- for(Range<Token> newRange : newRanges)
+ for(Range<Token> newRange : newRanges)
+ {
+ for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
{
- for(Range<Token> pendingRange : newRange.subtractAll(oldRanges))
- {
- newPendingRanges.addPendingRange(pendingRange, address);
- }
+ newPendingRanges.addPendingRange(pendingRange, address);
}
}
}
-
- allLeftMetadata.removeEndpoint(endpoint);
}
- pendingRanges.put(keyspaceName, newPendingRanges);
-
- if (logger.isTraceEnabled())
- logger.trace("Pending ranges:\n{}", (pendingRanges.isEmpty() ? "<empty>" : printPendingRanges()));
- }
- finally
- {
- lock.readLock().unlock();
+ allLeftMetadata.removeEndpoint(endpoint);
}
+
+ return newPendingRanges;
}
public Token getPredecessor(Token token)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 806f6a5..116cede 100644
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,6 +35,9 @@ public class PendingRangeCalculatorService
public static final PendingRangeCalculatorService instance = new PendingRangeCalculatorService();
private static Logger logger = LoggerFactory.getLogger(PendingRangeCalculatorService.class);
+
+ // the executor will only run a single range calculation at a time while keeping at most one task queued in order
+ // to trigger an update only after the most recent state change and not for each update individually
private final JMXEnabledThreadPoolExecutor executor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1), new NamedThreadFactory("PendingRangeCalculator"), "internal");
@@ -58,9 +62,11 @@ public class PendingRangeCalculatorService
try
{
long start = System.currentTimeMillis();
- for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
+ List<String> keyspaces = Schema.instance.getNonSystemKeyspaces();
+ for (String keyspaceName : keyspaces)
calculatePendingRanges(Keyspace.open(keyspaceName).getReplicationStrategy(), keyspaceName);
- logger.debug("finished calculation for {} keyspaces in {}ms", Schema.instance.getNonSystemKeyspaces().size(), System.currentTimeMillis() - start);
+ if (logger.isTraceEnabled())
+ logger.trace("Finished PendingRangeTask for {} keyspaces in {}ms", keyspaces.size(), System.currentTimeMillis() - start);
}
finally
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9cd7d540/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
new file mode 100644
index 0000000..507948c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+/**
+ * Test for "Gossip blocks on startup when another node is bootstrapping" (CASSANDRA-12281).
+ */
+@RunWith(BMUnitRunner.class)
+public class PendingRangeCalculatorServiceTest
+{
+ static ReentrantLock calculationLock = new ReentrantLock();
+
+ @BeforeClass
+ public static void setUp() throws ConfigurationException
+ {
+ SchemaLoader.prepareServer();
+ StorageService.instance.initServer();
+ }
+
+ @Test
+ @BMRule(name = "Block pending range calculation",
+ targetClass = "TokenMetadata",
+ targetMethod = "calculatePendingRanges",
+ targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges",
+ action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()")
+ public void testDelayedResponse() throws UnknownHostException, InterruptedException
+ {
+ final InetAddress otherNodeAddr = InetAddress.getByName("127.0.0.2");
+ final UUID otherHostId = UUID.randomUUID();
+
+ // introduce node for first major state change
+ Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 1, false));
+
+ // acquire lock to block pending range calculation via byteman
+ calculationLock.lock();
+ try
+ {
+ // spawn thread that will trigger handling of a bootstrap state change which in turn will trigger
+ // the pending range calculation that will be blocked by our lock
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 2, true));
+ }
+ };
+ t1.start();
+
+ // busy-spin until t1 is blocked by lock
+ while (!calculationLock.hasQueuedThreads()) ;
+
+ // trigger further state changes in case we don't want the blocked thread from the
+ // expensive range calculation to block us here as well
+ Thread t2 = new Thread()
+ {
+ public void run()
+ {
+ Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 3, false));
+ Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 4, false));
+ Gossiper.instance.applyStateLocally(getStates(otherNodeAddr, otherHostId, 5, false));
+ }
+ };
+ t2.start();
+ t2.join(2000);
+ assertFalse("Thread still blocked by pending range calculation", t2.isAlive());
+ assertEquals(5, Gossiper.instance.getEndpointStateForEndpoint(otherNodeAddr).getHeartBeatState().getHeartBeatVersion());
+ }
+ finally
+ {
+ calculationLock.unlock();
+ }
+ }
+
+ private Map<InetAddress, EndpointState> getStates(InetAddress otherNodeAddr, UUID hostId, int ver, boolean bootstrapping)
+ {
+ HeartBeatState hb = new HeartBeatState(1, ver);
+ EndpointState state = new EndpointState(hb);
+ Collection<Token> tokens = new ArrayList<>();
+
+ tokens.add(new ByteOrderedPartitioner.BytesToken(new byte[]{1,2,3}));
+ state.addApplicationState(ApplicationState.TOKENS, StorageService.instance.valueFactory.tokens(tokens));
+ state.addApplicationState(ApplicationState.STATUS, bootstrapping ?
+ StorageService.instance.valueFactory.bootstrapping(tokens) : StorageService.instance.valueFactory.normal(tokens));
+ state.addApplicationState(ApplicationState.HOST_ID, StorageService.instance.valueFactory.hostId(hostId));
+ state.addApplicationState(ApplicationState.NET_VERSION, StorageService.instance.valueFactory.networkVersion());
+
+ Map<InetAddress, EndpointState> states = new HashMap<>();
+ states.put(otherNodeAddr, state);
+ return states;
+ }
+}