You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/09/14 12:11:19 UTC
[cassandra] branch trunk updated: Correctly handle pending ranges
with adjacent range movements
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8ba163f Correctly handle pending ranges with adjacent range movements
8ba163f is described below
commit 8ba163f25a56cb507e621b89b6928c2aef0ecc57
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Thu Mar 26 17:01:27 2020 +0100
Correctly handle pending ranges with adjacent range movements
Patch by Aleksandr Sorokoumov; reviewed by Sam Tunnicliffe and Marcus Eriksson for CASSANDRA-14801
---
CHANGES.txt | 1 +
.../apache/cassandra/locator/TokenMetadata.java | 12 +-
.../cassandra/locator/PendingRangesTest.java | 293 ++++++++++++++++++++-
3 files changed, 298 insertions(+), 8 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 3af602c..90238ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta3
+ * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801)
* Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
* Add nodetool getfullquerylog (CASSANDRA-15988)
* Fix yaml format and alignment in tpstats (CASSANDRA-11402)
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 3bb265d..ca8fd99 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -787,7 +787,7 @@ public class TokenMetadata
Replica replica = entry.getValue();
if (replica.endpoint().equals(endpoint))
{
- builder.add(replica);
+ builder.add(replica, Conflict.DUPLICATE);
}
}
return builder.build();
@@ -899,11 +899,15 @@ public class TokenMetadata
{
EndpointsForRange currentReplicas = strategy.calculateNaturalReplicas(range.right, metadata);
EndpointsForRange newReplicas = strategy.calculateNaturalReplicas(range.right, allLeftMetadata);
- for (Replica replica : newReplicas)
+ for (Replica newReplica : newReplicas)
{
- if (currentReplicas.endpoints().contains(replica.endpoint()))
+ if (currentReplicas.endpoints().contains(newReplica.endpoint()))
continue;
- newPendingRanges.addPendingRange(range, replica);
+
+ // we calculate pending replicas for leave- and move- affected ranges in the same way to avoid
+ // a possible conflict when 2 pending replicas have the same endpoint and different ranges.
+ for (Replica pendingReplica : newReplica.subtractSameReplication(addressRanges.get(newReplica.endpoint())))
+ newPendingRanges.addPendingRange(range, pendingReplica);
}
}
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangesTest.java b/test/unit/org/apache/cassandra/locator/PendingRangesTest.java
index 48bf546..7959366 100644
--- a/test/unit/org/apache/cassandra/locator/PendingRangesTest.java
+++ b/test/unit/org/apache/cassandra/locator/PendingRangesTest.java
@@ -19,19 +19,24 @@
package org.apache.cassandra.locator;
import java.net.UnknownHostException;
-import java.util.Collections;
+import java.util.*;
+import java.util.stream.Collectors;
import com.google.common.collect.*;
import org.junit.BeforeClass;
import org.junit.Test;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.Generate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.integers;
public class PendingRangesTest
{
@@ -170,6 +175,282 @@ public class PendingRangesTest
assertPendingRanges(tm.getPendingRangesMM(KEYSPACE), expected);
}
+ @Test
+ public void testConcurrentAdjacentLeaveAndMove()
+ {
+ TokenMetadata tm = new TokenMetadata();
+ AbstractReplicationStrategy strategy = simpleStrategy(tm, 3);
+
+ Token newToken = token(0);
+ Token token1 = token(-9);
+ Token token2 = token(-5);
+ Token token3 = token(-1);
+ Token token4 = token(1);
+ Token token5 = token(5);
+
+ InetAddressAndPort node1 = peer(1);
+ InetAddressAndPort node2 = peer(2);
+ InetAddressAndPort node3 = peer(3);
+ InetAddressAndPort node4 = peer(4);
+ InetAddressAndPort node5 = peer(5);
+
+ // setup initial ring
+ addNode(tm, node1, token1);
+ addNode(tm, node2, token2);
+ addNode(tm, node3, token3);
+ addNode(tm, node4, token4);
+ addNode(tm, node5, token5);
+
+ tm.addLeavingEndpoint(node5);
+ tm.addMovingEndpoint(newToken, node3);
+
+ tm.calculatePendingRanges(strategy, KEYSPACE);
+ assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node1, new Range<>(token2, token3), true)),
+ tm.getPendingRanges(KEYSPACE, node1));
+ assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node2, new Range<>(token3, token4), true)),
+ tm.getPendingRanges(KEYSPACE, node2));
+ assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node3, new Range<>(token3, newToken), true),
+ new Replica(node3, new Range<>(token4, token5), true)),
+ tm.getPendingRanges(KEYSPACE, node3));
+ assertRangesAtEndpoint(RangesAtEndpoint.empty(node4), tm.getPendingRanges(KEYSPACE, node4));
+ assertRangesAtEndpoint(RangesAtEndpoint.empty(node5), tm.getPendingRanges(KEYSPACE, node5));
+ }
+
+ @Test
+ public void testConcurrentAdjacentLeavingNodes()
+ {
+ TokenMetadata tm = new TokenMetadata();
+ AbstractReplicationStrategy strategy = simpleStrategy(tm, 2);
+
+ Token token1 = token(-9);
+ Token token2 = token(-4);
+ Token token3 = token(0);
+ Token token4 = token(4);
+
+ InetAddressAndPort node1 = peer(1);
+ InetAddressAndPort node2 = peer(2);
+ InetAddressAndPort node3 = peer(3);
+ InetAddressAndPort node4 = peer(4);
+
+ addNode(tm, node1, token1);
+ addNode(tm, node2, token2);
+ addNode(tm, node3, token3);
+ addNode(tm, node4, token4);
+
+ tm.addLeavingEndpoint(node2);
+ tm.addLeavingEndpoint(node3);
+
+ tm.calculatePendingRanges(strategy, KEYSPACE);
+ assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node1, new Range<>(token1, token3), true)),
+ tm.getPendingRanges(KEYSPACE, node1));
+ assertRangesAtEndpoint(RangesAtEndpoint.empty(node2), tm.getPendingRanges(KEYSPACE, node2));
+ assertRangesAtEndpoint(RangesAtEndpoint.empty(node3), tm.getPendingRanges(KEYSPACE, node3));
+ assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node4, new Range<>(token4, token1), true),
+ new Replica(node4, new Range<>(token1, token2), true)),
+ tm.getPendingRanges(KEYSPACE, node4));
+ }
+
+ @Test
+ public void testBootstrapLeaveAndMovePermutationsWithoutVnodes()
+ {
+ // In a non-vnode cluster (i.e. where tokensPerNode == 1), we can
+ // add, remove and move nodes
+ int maxRf = 5;
+ int nodes = 50;
+ Gen<Integer> rfs = rf(maxRf);
+ Gen<Input> inputs = rfs.flatMap(rf -> input(rf, nodes));
+
+ qt().forAll(inputs.flatMap(this::clustersWithChangedTopology))
+ .checkAssert(Cluster::calculateAndGetPendingRanges);
+ }
+
+ @Test
+ public void testBootstrapAndLeavePermutationsWithVnodes()
+ {
+ // In a vnode cluster (i.e. where tokensPerNode > 1), move is not
+ // supported, so only leave and bootstrap operations will occur
+ int maxRf = 5;
+ int nodes = 50;
+ int maxTokensPerNode = 16;
+
+ Gen<Integer> rfs = rf(maxRf);
+ Gen<Input> inputs = rfs.flatMap(rf -> input(rf, nodes, maxTokensPerNode));
+
+ qt().forAll(inputs.flatMap(this::clustersWithChangedTopology))
+ .checkAssert(Cluster::calculateAndGetPendingRanges);
+ }
+
+ private Gen<Integer> rf(int maxRf)
+ {
+ return integers().between(1, maxRf);
+ }
+
+ private Gen<Input> input(int rf, int maxNodes)
+ {
+ return integers().between(rf, maxNodes).map(n -> new Input(rf, n, 1));
+ }
+
+ private Gen<Input> input(int rf, int maxNodes, int maxTokensPerNode)
+ {
+ Gen<Integer> tokensPerNode = integers().between(1, maxTokensPerNode);
+ return integers().between(rf, maxNodes)
+ .zip(tokensPerNode, (n, tokens) -> new Input(rf, n, tokens));
+ }
+
+ private Gen<Integer> bootstrappedNodes(Input input)
+ {
+ // at most double in size
+ return integers().between(0, input.nodes);
+ }
+
+ private Gen<Integer> leftNodes(Input input)
+ {
+ return integers().between(0, input.nodes - input.rf);
+ }
+
+ private Gen<Integer> movedNodes(Input input)
+ {
+ // Move is not supported in vnode clusters
+ if (input.tokensPerNode > 1)
+ return integers().between(0, 0);
+
+ return integers().between(0, input.nodes);
+ }
+
+ private Gen<Cluster> clusters(Input input)
+ {
+ return Generate.constant(() -> new Cluster(input.rf, input.nodes, input.tokensPerNode));
+ }
+
+ private Gen<Cluster> clustersWithChangedTopology(Input input)
+ {
+ Gen<Cluster> clusters = clusters(input);
+ Gen<Integer> leftNodes = leftNodes(input);
+ Gen<Integer> bootstrappedNodes = bootstrappedNodes(input);
+ Gen<Integer> movedNodes = movedNodes(input);
+
+ return clusters.zip(leftNodes, bootstrappedNodes, movedNodes,
+ (cluster, left, bootstrapped, moved) -> cluster.decommissionNodes(left)
+ .bootstrapNodes(bootstrapped)
+ .moveNodes(moved));
+ }
+
+ static class Input
+ {
+ final int rf;
+ final int nodes;
+ final int tokensPerNode;
+
+ Input(int rf, int nodes, int tokensPerNode)
+ {
+ this.rf = rf;
+ this.nodes = nodes;
+ this.tokensPerNode = tokensPerNode;
+ }
+
+ public String toString()
+ {
+ return String.format("Input(rf=%s, nodes=%s, tokensPerNode=%s)", rf, nodes, tokensPerNode);
+ }
+ }
+
+ private static class Cluster
+ {
+ private final TokenMetadata tm;
+ private final int tokensPerNode;
+ private final AbstractReplicationStrategy strategy;
+
+ private final List<InetAddressAndPort> nodes;
+ Random random = new Random();
+
+ Cluster(int rf, int initialNodes, int tokensPerNode)
+ {
+ this.tm = new TokenMetadata();
+ this.tokensPerNode = tokensPerNode;
+ this.strategy = simpleStrategy(tm, rf);
+
+ this.nodes = new ArrayList<>(initialNodes);
+ for (int i = 0; i < initialNodes; i++)
+ addInitialNode();
+ }
+
+ private void addInitialNode()
+ {
+ InetAddressAndPort node = peer(nodes.size() + 1);
+ tm.updateHostId(UUID.randomUUID(), node);
+ tm.updateNormalTokens(tokens(), node);
+ nodes.add(node);
+ }
+
+ private void bootstrapNode()
+ {
+ InetAddressAndPort node = peer(nodes.size() + 1);
+ tm.updateHostId(UUID.randomUUID(), node);
+ tm.addBootstrapTokens(tokens(), node);
+ nodes.add(node);
+ }
+
+ void calculateAndGetPendingRanges()
+ {
+ // test that it does not crash
+ tm.calculatePendingRanges(strategy, KEYSPACE);
+ for (InetAddressAndPort node : nodes)
+ tm.getPendingRanges(KEYSPACE, node);
+ }
+
+ Cluster decommissionNodes(int cnt)
+ {
+ for (int i = 0; i < cnt; i++)
+ tm.addLeavingEndpoint(nodes.get(random.nextInt(nodes.size())));
+ return this;
+ }
+
+ Cluster bootstrapNodes(int cnt)
+ {
+ for (int i = 0; i < cnt; i++)
+ bootstrapNode();
+ return this;
+ }
+
+ Cluster moveNodes(int cnt)
+ {
+ assert cnt == 0 || tokensPerNode == 1 : "Moving tokens is not supported when tokensPerNode";
+
+ for (int i = 0; i < cnt; i++)
+ moveNode();
+ return this;
+ }
+
+ private void moveNode()
+ {
+ if (tm.getSizeOfMovingEndpoints() >= nodes.size())
+ throw new IllegalStateException("Number of movements should not exceed total nodes in the cluster");
+
+ // we want to ensure that any given node is only marked as moving once.
+ List<InetAddressAndPort> moveCandidates = nodes.stream()
+ .filter(node -> !tm.isMoving(node))
+ .collect(Collectors.toList());
+ InetAddressAndPort node = moveCandidates.get(random.nextInt(moveCandidates.size()));
+ tm.addMovingEndpoint(token(random.nextLong()), node);
+ }
+
+ private Collection<Token> tokens()
+ {
+ Collection<Token> tokens = new ArrayList<>(tokensPerNode);
+ for (int i=0; i< tokensPerNode; i++)
+ tokens.add(token(random.nextLong()));
+ return tokens;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Nodes: %s\n" +
+ "Metadata: %s",
+ nodes.size(),
+ tm.toString());
+ }
+ }
private void assertPendingRanges(PendingRangeMaps pending, RangesByEndpoint expected)
{
@@ -188,6 +469,11 @@ public class PendingRangesTest
assertRangesByEndpoint(expected, actual.build());
}
+ private void assertRangesAtEndpoint(RangesAtEndpoint expected, RangesAtEndpoint actual)
+ {
+ assertEquals(expected.size(), actual.size());
+ assertTrue(Iterables.all(expected, actual::contains));
+ }
private void assertRangesByEndpoint(RangesByEndpoint expected, RangesByEndpoint actual)
{
@@ -196,8 +482,7 @@ public class PendingRangesTest
{
RangesAtEndpoint expectedReplicas = expected.get(endpoint);
RangesAtEndpoint actualReplicas = actual.get(endpoint);
- assertEquals(expectedReplicas.size(), actualReplicas.size());
- assertTrue(Iterables.all(expectedReplicas, actualReplicas::contains));
+ assertRangesAtEndpoint(expectedReplicas, actualReplicas);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org