You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/09/14 12:11:19 UTC

[cassandra] branch trunk updated: Correctly handle pending ranges with adjacent range movements

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8ba163f  Correctly handle pending ranges with adjacent range movements
8ba163f is described below

commit 8ba163f25a56cb507e621b89b6928c2aef0ecc57
Author: Aleksandr Sorokoumov <al...@gmail.com>
AuthorDate: Thu Mar 26 17:01:27 2020 +0100

    Correctly handle pending ranges with adjacent range movements
    
    Patch by Aleksandr Sorokoumov; reviewed by Sam Tunnicliffe and Marcus Eriksson for CASSANDRA-14801
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/locator/TokenMetadata.java    |  12 +-
 .../cassandra/locator/PendingRangesTest.java       | 293 ++++++++++++++++++++-
 3 files changed, 298 insertions(+), 8 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 3af602c..90238ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta3
+ * Correctly handle pending ranges with adjacent range movements (CASSANDRA-14801)
  * Avoid adding locahost when streaming trivial ranges (CASSANDRA-16099)
  * Add nodetool getfullquerylog (CASSANDRA-15988)
  * Fix yaml format and alignment in tpstats (CASSANDRA-11402)
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 3bb265d..ca8fd99 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -787,7 +787,7 @@ public class TokenMetadata
             Replica replica = entry.getValue();
             if (replica.endpoint().equals(endpoint))
             {
-                builder.add(replica);
+                builder.add(replica, Conflict.DUPLICATE);
             }
         }
         return builder.build();
@@ -899,11 +899,15 @@ public class TokenMetadata
         {
             EndpointsForRange currentReplicas = strategy.calculateNaturalReplicas(range.right, metadata);
             EndpointsForRange newReplicas = strategy.calculateNaturalReplicas(range.right, allLeftMetadata);
-            for (Replica replica : newReplicas)
+            for (Replica newReplica : newReplicas)
             {
-                if (currentReplicas.endpoints().contains(replica.endpoint()))
+                if (currentReplicas.endpoints().contains(newReplica.endpoint()))
                     continue;
-                newPendingRanges.addPendingRange(range, replica);
+
+                // we calculate pending replicas for leave- and move- affected ranges in the same way to avoid
+                // a possible conflict when 2 pending replicas have the same endpoint and different ranges.
+                for (Replica pendingReplica : newReplica.subtractSameReplication(addressRanges.get(newReplica.endpoint())))
+                    newPendingRanges.addPendingRange(range, pendingReplica);
             }
         }
 
diff --git a/test/unit/org/apache/cassandra/locator/PendingRangesTest.java b/test/unit/org/apache/cassandra/locator/PendingRangesTest.java
index 48bf546..7959366 100644
--- a/test/unit/org/apache/cassandra/locator/PendingRangesTest.java
+++ b/test/unit/org/apache/cassandra/locator/PendingRangesTest.java
@@ -19,19 +19,24 @@
 package org.apache.cassandra.locator;
 
 import java.net.UnknownHostException;
-import java.util.Collections;
+import java.util.*;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.*;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.Generate;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.quicktheories.QuickTheory.qt;
+import static org.quicktheories.generators.SourceDSL.integers;
 
 public class PendingRangesTest
 {
@@ -170,6 +175,282 @@ public class PendingRangesTest
         assertPendingRanges(tm.getPendingRangesMM(KEYSPACE), expected);
     }
 
+    @Test
+    public void testConcurrentAdjacentLeaveAndMove()
+    {
+        TokenMetadata tm = new TokenMetadata();
+        AbstractReplicationStrategy strategy = simpleStrategy(tm, 3);
+
+        Token newToken = token(0);
+        Token token1 = token(-9);
+        Token token2 = token(-5);
+        Token token3 = token(-1);
+        Token token4 = token(1);
+        Token token5 = token(5);
+
+        InetAddressAndPort node1 = peer(1);
+        InetAddressAndPort node2 = peer(2);
+        InetAddressAndPort node3 = peer(3);
+        InetAddressAndPort node4 = peer(4);
+        InetAddressAndPort node5 = peer(5);
+
+        // setup initial ring
+        addNode(tm, node1, token1);
+        addNode(tm, node2, token2);
+        addNode(tm, node3, token3);
+        addNode(tm, node4, token4);
+        addNode(tm, node5, token5);
+
+        tm.addLeavingEndpoint(node5);
+        tm.addMovingEndpoint(newToken, node3);
+
+        tm.calculatePendingRanges(strategy, KEYSPACE);
+        assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node1, new Range<>(token2, token3), true)),
+                               tm.getPendingRanges(KEYSPACE, node1));
+        assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node2, new Range<>(token3, token4), true)),
+                               tm.getPendingRanges(KEYSPACE, node2));
+        assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node3, new Range<>(token3, newToken), true),
+                                                   new Replica(node3, new Range<>(token4, token5), true)),
+                               tm.getPendingRanges(KEYSPACE, node3));
+        assertRangesAtEndpoint(RangesAtEndpoint.empty(node4), tm.getPendingRanges(KEYSPACE, node4));
+        assertRangesAtEndpoint(RangesAtEndpoint.empty(node5), tm.getPendingRanges(KEYSPACE, node5));
+    }
+
+    @Test
+    public void testConcurrentAdjacentLeavingNodes()
+    {
+        TokenMetadata tm = new TokenMetadata();
+        AbstractReplicationStrategy strategy = simpleStrategy(tm, 2);
+
+        Token token1 = token(-9);
+        Token token2 = token(-4);
+        Token token3 = token(0);
+        Token token4 = token(4);
+
+        InetAddressAndPort node1 = peer(1);
+        InetAddressAndPort node2 = peer(2);
+        InetAddressAndPort node3 = peer(3);
+        InetAddressAndPort node4 = peer(4);
+
+        addNode(tm, node1, token1);
+        addNode(tm, node2, token2);
+        addNode(tm, node3, token3);
+        addNode(tm, node4, token4);
+
+        tm.addLeavingEndpoint(node2);
+        tm.addLeavingEndpoint(node3);
+
+        tm.calculatePendingRanges(strategy, KEYSPACE);
+        assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node1, new Range<>(token1, token3), true)),
+                               tm.getPendingRanges(KEYSPACE, node1));
+        assertRangesAtEndpoint(RangesAtEndpoint.empty(node2), tm.getPendingRanges(KEYSPACE, node2));
+        assertRangesAtEndpoint(RangesAtEndpoint.empty(node3), tm.getPendingRanges(KEYSPACE, node3));
+        assertRangesAtEndpoint(RangesAtEndpoint.of(new Replica(node4, new Range<>(token4, token1), true),
+                                                   new Replica(node4, new Range<>(token1, token2), true)),
+                               tm.getPendingRanges(KEYSPACE, node4));
+    }
+
+    @Test
+    public void testBootstrapLeaveAndMovePermutationsWithoutVnodes()
+    {
+        // In a non-vnode cluster (i.e. where tokensPerNode == 1), we can
+        // add, remove and move nodes
+        int maxRf = 5;
+        int nodes = 50;
+        Gen<Integer> rfs = rf(maxRf);
+        Gen<Input> inputs = rfs.flatMap(rf -> input(rf, nodes));
+
+        qt().forAll(inputs.flatMap(this::clustersWithChangedTopology))
+            .checkAssert(Cluster::calculateAndGetPendingRanges);
+    }
+
+    @Test
+    public void testBootstrapAndLeavePermutationsWithVnodes()
+    {
+        // In a vnode cluster (i.e. where tokensPerNode > 1), move is not
+        // supported, so only leave and bootstrap operations will occur
+        int maxRf = 5;
+        int nodes = 50;
+        int maxTokensPerNode = 16;
+
+        Gen<Integer> rfs = rf(maxRf);
+        Gen<Input> inputs = rfs.flatMap(rf -> input(rf, nodes, maxTokensPerNode));
+
+        qt().forAll(inputs.flatMap(this::clustersWithChangedTopology))
+            .checkAssert(Cluster::calculateAndGetPendingRanges);
+    }
+
+    private Gen<Integer> rf(int maxRf)
+    {
+        return integers().between(1, maxRf);
+    }
+
+    private Gen<Input> input(int rf, int maxNodes)
+    {
+        return integers().between(rf, maxNodes).map(n -> new Input(rf, n, 1));
+    }
+
+    private Gen<Input> input(int rf, int maxNodes, int maxTokensPerNode)
+    {
+        Gen<Integer> tokensPerNode = integers().between(1, maxTokensPerNode);
+        return integers().between(rf, maxNodes)
+                         .zip(tokensPerNode, (n, tokens) -> new Input(rf, n, tokens));
+    }
+
+    private Gen<Integer> bootstrappedNodes(Input input)
+    {
+        // at most double in size
+        return integers().between(0, input.nodes);
+    }
+
+    private Gen<Integer> leftNodes(Input input)
+    {
+        return integers().between(0, input.nodes - input.rf);
+    }
+
+    private Gen<Integer> movedNodes(Input input)
+    {
+        // Move is not supported in vnode clusters
+        if (input.tokensPerNode > 1)
+            return integers().between(0, 0);
+
+        return integers().between(0, input.nodes);
+    }
+
+    private Gen<Cluster> clusters(Input input)
+    {
+        return Generate.constant(() -> new Cluster(input.rf, input.nodes, input.tokensPerNode));
+    }
+
+    private Gen<Cluster> clustersWithChangedTopology(Input input)
+    {
+        Gen<Cluster> clusters = clusters(input);
+        Gen<Integer> leftNodes = leftNodes(input);
+        Gen<Integer> bootstrappedNodes = bootstrappedNodes(input);
+        Gen<Integer> movedNodes = movedNodes(input);
+
+        return clusters.zip(leftNodes, bootstrappedNodes, movedNodes,
+                            (cluster, left, bootstrapped, moved) -> cluster.decommissionNodes(left)
+                                                                           .bootstrapNodes(bootstrapped)
+                                                                           .moveNodes(moved));
+    }
+
+    static class Input
+    {
+        final int rf;
+        final int nodes;
+        final int tokensPerNode;
+
+        Input(int rf, int nodes, int tokensPerNode)
+        {
+            this.rf = rf;
+            this.nodes = nodes;
+            this.tokensPerNode = tokensPerNode;
+        }
+
+        public String toString()
+        {
+            return String.format("Input(rf=%s, nodes=%s, tokensPerNode=%s)", rf, nodes, tokensPerNode);
+        }
+    }
+
+    private static class Cluster
+    {
+        private final TokenMetadata tm;
+        private final int tokensPerNode;
+        private final AbstractReplicationStrategy strategy;
+
+        private final List<InetAddressAndPort> nodes;
+        Random random = new Random();
+
+        Cluster(int rf, int initialNodes, int tokensPerNode)
+        {
+            this.tm = new TokenMetadata();
+            this.tokensPerNode = tokensPerNode;
+            this.strategy = simpleStrategy(tm, rf);
+
+            this.nodes = new ArrayList<>(initialNodes);
+            for (int i = 0; i < initialNodes; i++)
+                addInitialNode();
+        }
+
+        private void addInitialNode()
+        {
+            InetAddressAndPort node = peer(nodes.size() + 1);
+            tm.updateHostId(UUID.randomUUID(), node);
+            tm.updateNormalTokens(tokens(), node);
+            nodes.add(node);
+        }
+
+        private void bootstrapNode()
+        {
+            InetAddressAndPort node = peer(nodes.size() + 1);
+            tm.updateHostId(UUID.randomUUID(), node);
+            tm.addBootstrapTokens(tokens(), node);
+            nodes.add(node);
+        }
+
+        void calculateAndGetPendingRanges()
+        {
+            // test that it does not crash
+            tm.calculatePendingRanges(strategy, KEYSPACE);
+            for (InetAddressAndPort node : nodes)
+                tm.getPendingRanges(KEYSPACE, node);
+        }
+
+        Cluster decommissionNodes(int cnt)
+        {
+            for (int i = 0; i < cnt; i++)
+                tm.addLeavingEndpoint(nodes.get(random.nextInt(nodes.size())));
+            return this;
+        }
+
+        Cluster bootstrapNodes(int cnt)
+        {
+            for (int i = 0; i < cnt; i++)
+                bootstrapNode();
+            return this;
+        }
+
+        Cluster moveNodes(int cnt)
+        {
+            assert cnt == 0 || tokensPerNode == 1 : "Moving tokens is not supported when tokensPerNode";
+
+            for (int i = 0; i < cnt; i++)
+                moveNode();
+            return this;
+        }
+
+        private void moveNode()
+        {
+            if (tm.getSizeOfMovingEndpoints() >= nodes.size())
+                throw new IllegalStateException("Number of movements should not exceed total nodes in the cluster");
+
+            // we want to ensure that any given node is only marked as moving once.
+            List<InetAddressAndPort> moveCandidates = nodes.stream()
+                                                           .filter(node -> !tm.isMoving(node))
+                                                           .collect(Collectors.toList());
+            InetAddressAndPort node = moveCandidates.get(random.nextInt(moveCandidates.size()));
+            tm.addMovingEndpoint(token(random.nextLong()), node);
+        }
+
+        private Collection<Token> tokens()
+        {
+            Collection<Token> tokens = new ArrayList<>(tokensPerNode);
+            for (int i=0; i< tokensPerNode; i++)
+                tokens.add(token(random.nextLong()));
+            return tokens;
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("Nodes: %s\n" +
+                                 "Metadata: %s",
+                                 nodes.size(),
+                                 tm.toString());
+        }
+    }
 
     private void assertPendingRanges(PendingRangeMaps pending, RangesByEndpoint expected)
     {
@@ -188,6 +469,11 @@ public class PendingRangesTest
         assertRangesByEndpoint(expected, actual.build());
     }
 
+    private void assertRangesAtEndpoint(RangesAtEndpoint expected, RangesAtEndpoint actual)
+    {
+        assertEquals(expected.size(), actual.size());
+        assertTrue(Iterables.all(expected, actual::contains));
+    }
 
     private void assertRangesByEndpoint(RangesByEndpoint expected, RangesByEndpoint actual)
     {
@@ -196,8 +482,7 @@ public class PendingRangesTest
         {
             RangesAtEndpoint expectedReplicas = expected.get(endpoint);
             RangesAtEndpoint actualReplicas = actual.get(endpoint);
-            assertEquals(expectedReplicas.size(), actualReplicas.size());
-            assertTrue(Iterables.all(expectedReplicas, actualReplicas::contains));
+            assertRangesAtEndpoint(expectedReplicas, actualReplicas);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org