You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2021/07/20 18:39:40 UTC

[cassandra] branch cassandra-3.0 updated: Receipt of gossip shutdown updates TokenMetadata

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new fbb20b9  Receipt of gossip shutdown updates TokenMetadata
fbb20b9 is described below

commit fbb20b9162b73c4de8a82cf4ffdde3304e904603
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Mon Jul 12 17:23:18 2021 +0100

    Receipt of gossip shutdown updates TokenMetadata
    
    Patch by Sam Tunnicliffe; reviewed by Caleb Rackliffe for
    CASSANDRA-16796
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   6 +-
 .../cassandra/distributed/test/GossipTest.java     | 138 ++++++++++++++++++++-
 3 files changed, 139 insertions(+), 6 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d4e9322..58ac902 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.25:
+ * Receipt of gossip shutdown notification updates TokenMetadata (CASSANDRA-16796)
  * Count bloom filter misses correctly (CASSANDRA-12922)
  * Reject token() in MV WHERE clause (CASSANDRA-13464)
  * Ensure java executable is on the path (CASSANDRA-14325)
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 0f37dc9..818df50 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -426,11 +426,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         EndpointState epState = endpointStateMap.get(endpoint);
         if (epState == null)
             return;
-        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
+        VersionedValue shutdown = StorageService.instance.valueFactory.shutdown(true);
+        epState.addApplicationState(ApplicationState.STATUS, shutdown);
         epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false));
         epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
         markDead(endpoint, epState);
         FailureDetector.instance.forceConviction(endpoint);
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+            subscriber.onChange(endpoint, ApplicationState.STATUS, shutdown);
+        logger.debug("Marked {} as shutdown", endpoint);
     }
 
     /**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index 32ecb95..ba6027b 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@ -20,16 +20,15 @@ package org.apache.cassandra.distributed.test;
 
 import java.io.Closeable;
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.concurrent.*;
 import java.util.concurrent.locks.LockSupport;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,16 +38,21 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.*;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
 
 public class GossipTest extends TestBaseImpl
 {
@@ -224,4 +228,128 @@ public class GossipTest extends TestBaseImpl
         }
     }
 
+    @Test
+    public void gossipShutdownUpdatesTokenMetadata() throws Exception
+    {
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
+                                      .withInstanceInitializer(FailureHelper::installMoveFailure)
+                                      .start())
+        {
+            init(cluster, 2);
+            populate(cluster);
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+            IInvokableInstance node3 = cluster.get(3);
+
+            // initiate a move for node2, which will not complete due to the
+            // ByteBuddy interceptor we injected. Wait for the other two nodes
+            // to mark node2 as moving before proceeding.
+            long t2 = Long.parseLong(getLocalToken(node2));
+            long t3 = Long.parseLong(getLocalToken(node3));
+            long moveTo = t2 + ((t3 - t2)/2);
+            String logMsg = "Node " + node2.broadcastAddress().getAddress() + " state moving, new token " + moveTo;
+            runAndWaitForLogs(() -> node2.nodetoolResult("move", "--", Long.toString(moveTo)).asserts().failure(),
+                              logMsg,
+                              cluster);
+
+            InetAddress movingAddress = node2.broadcastAddress().getAddress();
+            // node1 & node3 should now consider some ranges pending for node2
+            assertPendingRangesForPeer(true, movingAddress, cluster);
+
+            // A controlled shutdown causes peers to replace the MOVING status to be with SHUTDOWN, but prior to
+            // CASSANDRA-16796 this doesn't update TokenMetadata, so they maintain pending ranges for the down node
+            // indefinitely, even after it has been removed from the ring.
+            logMsg = "Marked " + node2.broadcastAddress().getAddress() + " as shutdown";
+            runAndWaitForLogs(() -> Futures.getUnchecked(node2.shutdown()),
+                              logMsg,
+                              node1, node3);
+            // node1 & node3 should not consider any ranges as still pending for node2
+            assertPendingRangesForPeer(false, movingAddress, cluster);
+        }
+    }
+
+    void assertPendingRangesForPeer(final boolean expectPending, final InetAddress movingAddress, final Cluster cluster)
+    {
+        for (IInvokableInstance inst : new IInvokableInstance[]{ cluster.get(1), cluster.get(3)})
+        {
+            boolean hasPending = inst.appliesOnInstance((InetAddress peer) -> {
+
+                PendingRangeCalculatorService.instance.blockUntilFinished();
+
+                boolean isMoving = StorageService.instance.getTokenMetadata()
+                                                          .getMovingEndpoints()
+                                                          .stream()
+                                                          .map(pair -> pair.right)
+                                                          .anyMatch(peer::equals);
+
+                return isMoving && !StorageService.instance.getTokenMetadata()
+                                                           .getPendingRanges(KEYSPACE, peer)
+                                                           .isEmpty();
+            }).apply(movingAddress);
+            assertEquals(String.format("%s should %shave PENDING RANGES for %s",
+                                       inst.broadcastAddress().getHostString(),
+                                       expectPending ? "" : "not ",
+                                       movingAddress),
+                         hasPending, expectPending);
+        }
+    }
+
+    private String getLocalToken(IInvokableInstance inst)
+    {
+        return inst.callOnInstance(() -> {
+            List<String> tokens = new ArrayList<>();
+            for (Token t : StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()))
+                tokens.add(t.getTokenValue().toString());
+
+            assert tokens.size() == 1 : "getLocalToken assumes a single token, but multiple tokens found";
+            return tokens.get(0);
+        });
+    }
+
+    public static void runAndWaitForLogs(Runnable r, String waitString, Cluster cluster) throws TimeoutException
+    {
+        runAndWaitForLogs(r, waitString, cluster.stream().toArray(IInstance[]::new));
+    }
+
+    public static void runAndWaitForLogs(Runnable r, String waitString, IInstance...instances) throws TimeoutException
+    {
+        long [] marks = new long[instances.length];
+        for (int i = 0; i < instances.length; i++)
+            marks[i] = instances[i].logs().mark();
+        r.run();
+        for (int i = 0; i < instances.length; i++)
+            instances[i].logs().watchFor(marks[i], waitString);
+    }
+
+    static void populate(Cluster cluster)
+    {
+        cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int PRIMARY KEY)");
+        for (int i = 0; i < 10; i++)
+        {
+            cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)",
+                                           ConsistencyLevel.ALL,
+                                           i);
+        }
+    }
+
+    public static class FailureHelper
+    {
+        static void installMoveFailure(ClassLoader cl, int nodeNumber)
+        {
+            if (nodeNumber == 2)
+            {
+                new ByteBuddy().redefine(StreamPlan.class)
+                               .method(named("execute"))
+                               .intercept(MethodDelegation.to(FailureHelper.class))
+                               .make()
+                               .load(cl, ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        public static StreamResultFuture execute()
+        {
+            throw new RuntimeException("failing to execute move");
+        }
+    }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org