You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2021/07/20 18:39:40 UTC
[cassandra] branch cassandra-3.0 updated: Receipt of gossip
shutdown updates TokenMetadata
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new fbb20b9 Receipt of gossip shutdown updates TokenMetadata
fbb20b9 is described below
commit fbb20b9162b73c4de8a82cf4ffdde3304e904603
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Mon Jul 12 17:23:18 2021 +0100
Receipt of gossip shutdown updates TokenMetadata
Patch by Sam Tunnicliffe; reviewed by Caleb Rackliffe for
CASSANDRA-16796
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 6 +-
.../cassandra/distributed/test/GossipTest.java | 138 ++++++++++++++++++++-
3 files changed, 139 insertions(+), 6 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d4e9322..58ac902 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.25:
+ * Receipt of gossip shutdown notification updates TokenMetadata (CASSANDRA-16796)
* Count bloom filter misses correctly (CASSANDRA-12922)
* Reject token() in MV WHERE clause (CASSANDRA-13464)
* Ensure java executable is on the path (CASSANDRA-14325)
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 0f37dc9..818df50 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -426,11 +426,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
EndpointState epState = endpointStateMap.get(endpoint);
if (epState == null)
return;
- epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
+ VersionedValue shutdown = StorageService.instance.valueFactory.shutdown(true);
+ epState.addApplicationState(ApplicationState.STATUS, shutdown);
epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false));
epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
markDead(endpoint, epState);
FailureDetector.instance.forceConviction(endpoint);
+ for (IEndpointStateChangeSubscriber subscriber : subscribers)
+ subscriber.onChange(endpoint, ApplicationState.STATUS, shutdown);
+ logger.debug("Marked {} as shutdown", endpoint);
}
/**
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index 32ecb95..ba6027b 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@ -20,16 +20,15 @@ package org.apache.cassandra.distributed.test;
import java.io.Closeable;
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
+import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
import org.junit.Test;
@@ -39,16 +38,21 @@ import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.*;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.utils.FBUtilities;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
public class GossipTest extends TestBaseImpl
{
@@ -224,4 +228,128 @@ public class GossipTest extends TestBaseImpl
}
}
+ @Test
+ public void gossipShutdownUpdatesTokenMetadata() throws Exception
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
+ .withInstanceInitializer(FailureHelper::installMoveFailure)
+ .start())
+ {
+ init(cluster, 2);
+ populate(cluster);
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+ IInvokableInstance node3 = cluster.get(3);
+
+ // initiate a move for node2, which will not complete due to the
+ // ByteBuddy interceptor we injected. Wait for the other two nodes
+ // to mark node2 as moving before proceeding.
+ long t2 = Long.parseLong(getLocalToken(node2));
+ long t3 = Long.parseLong(getLocalToken(node3));
+ long moveTo = t2 + ((t3 - t2)/2);
+ String logMsg = "Node " + node2.broadcastAddress().getAddress() + " state moving, new token " + moveTo;
+ runAndWaitForLogs(() -> node2.nodetoolResult("move", "--", Long.toString(moveTo)).asserts().failure(),
+ logMsg,
+ cluster);
+
+ InetAddress movingAddress = node2.broadcastAddress().getAddress();
+ // node1 & node3 should now consider some ranges pending for node2
+ assertPendingRangesForPeer(true, movingAddress, cluster);
+
+ // A controlled shutdown causes peers to replace the MOVING status to be with SHUTDOWN, but prior to
+ // CASSANDRA-16796 this doesn't update TokenMetadata, so they maintain pending ranges for the down node
+ // indefinitely, even after it has been removed from the ring.
+ logMsg = "Marked " + node2.broadcastAddress().getAddress() + " as shutdown";
+ runAndWaitForLogs(() -> Futures.getUnchecked(node2.shutdown()),
+ logMsg,
+ node1, node3);
+ // node1 & node3 should not consider any ranges as still pending for node2
+ assertPendingRangesForPeer(false, movingAddress, cluster);
+ }
+ }
+
+ void assertPendingRangesForPeer(final boolean expectPending, final InetAddress movingAddress, final Cluster cluster)
+ {
+ for (IInvokableInstance inst : new IInvokableInstance[]{ cluster.get(1), cluster.get(3)})
+ {
+ boolean hasPending = inst.appliesOnInstance((InetAddress peer) -> {
+
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ boolean isMoving = StorageService.instance.getTokenMetadata()
+ .getMovingEndpoints()
+ .stream()
+ .map(pair -> pair.right)
+ .anyMatch(peer::equals);
+
+ return isMoving && !StorageService.instance.getTokenMetadata()
+ .getPendingRanges(KEYSPACE, peer)
+ .isEmpty();
+ }).apply(movingAddress);
+ assertEquals(String.format("%s should %shave PENDING RANGES for %s",
+ inst.broadcastAddress().getHostString(),
+ expectPending ? "" : "not ",
+ movingAddress),
+ hasPending, expectPending);
+ }
+ }
+
+ private String getLocalToken(IInvokableInstance inst)
+ {
+ return inst.callOnInstance(() -> {
+ List<String> tokens = new ArrayList<>();
+ for (Token t : StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()))
+ tokens.add(t.getTokenValue().toString());
+
+ assert tokens.size() == 1 : "getLocalToken assumes a single token, but multiple tokens found";
+ return tokens.get(0);
+ });
+ }
+
+ public static void runAndWaitForLogs(Runnable r, String waitString, Cluster cluster) throws TimeoutException
+ {
+ runAndWaitForLogs(r, waitString, cluster.stream().toArray(IInstance[]::new));
+ }
+
+ public static void runAndWaitForLogs(Runnable r, String waitString, IInstance...instances) throws TimeoutException
+ {
+ long [] marks = new long[instances.length];
+ for (int i = 0; i < instances.length; i++)
+ marks[i] = instances[i].logs().mark();
+ r.run();
+ for (int i = 0; i < instances.length; i++)
+ instances[i].logs().watchFor(marks[i], waitString);
+ }
+
+ static void populate(Cluster cluster)
+ {
+ cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int PRIMARY KEY)");
+ for (int i = 0; i < 10; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)",
+ ConsistencyLevel.ALL,
+ i);
+ }
+ }
+
+ public static class FailureHelper
+ {
+ static void installMoveFailure(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber == 2)
+ {
+ new ByteBuddy().redefine(StreamPlan.class)
+ .method(named("execute"))
+ .intercept(MethodDelegation.to(FailureHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ public static StreamResultFuture execute()
+ {
+ throw new RuntimeException("failing to execute move");
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org