You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2021/07/20 18:39:44 UTC
[cassandra] 01/01: Merge branch 'cassaNDRA-3.11' into cassandra-4.0
This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 64ec400ab6504ebf9447eb7250064a7795e29b81
Merge: 1853006 b604bd2
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Jul 20 19:31:56 2021 +0100
Merge branch 'cassaNDRA-3.11' into cassandra-4.0
CHANGES.txt | 1 +
src/java/org/apache/cassandra/gms/Gossiper.java | 6 +-
.../cassandra/distributed/shared/ClusterUtils.java | 31 ++++++
.../cassandra/distributed/test/GossipTest.java | 112 ++++++++++++++++++++-
4 files changed, 144 insertions(+), 6 deletions(-)
diff --cc CHANGES.txt
index b51f94f,89c7813..414dc3b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -9,7 -3,16 +9,8 @@@ Merged from 3.11
* Optimize bytes skipping when reading SSTable files (CASSANDRA-14415)
* Enable tombstone compactions when unchecked_tombstone_compaction is set in TWCS (CASSANDRA-14496)
* Read only the required SSTables for single partition queries (CASSANDRA-16737)
- * Fix LeveledCompactionStrategy compacts last level throw an ArrayIndexOutOfBoundsException (CASSANDRA-15669)
- * Maps $CASSANDRA_LOG_DIR to cassandra.logdir java property when executing nodetool (CASSANDRA-16199)
- * Nodetool garbagecollect should retain SSTableLevel for LCS (CASSANDRA-16634)
- * Ignore stale acks received in the shadow round (CASSANDRA-16588)
- * Add autocomplete and error messages for provide_overlapping_tombstones (CASSANDRA-16350)
- * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) (CASSANDRA-16447)
- * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552)
- * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
Merged from 3.0:
+ * Receipt of gossip shutdown notification updates TokenMetadata (CASSANDRA-16796)
* Count bloom filter misses correctly (CASSANDRA-12922)
* Reject token() in MV WHERE clause (CASSANDRA-13464)
* Ensure java executable is on the path (CASSANDRA-14325)
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index e194807,1603693..2c38cfb
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -539,13 -435,15 +539,17 @@@ public class Gossiper implements IFailu
EndpointState epState = endpointStateMap.get(endpoint);
if (epState == null)
return;
- epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.shutdown(true));
+ VersionedValue shutdown = StorageService.instance.valueFactory.shutdown(true);
- epState.addApplicationState(ApplicationState.STATUS, shutdown);
++ epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, shutdown);
+ epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false));
epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
markDead(endpoint, epState);
FailureDetector.instance.forceConviction(endpoint);
+ GossiperDiagnostics.markedAsShutdown(this, endpoint);
+ for (IEndpointStateChangeSubscriber subscriber : subscribers)
- subscriber.onChange(endpoint, ApplicationState.STATUS, shutdown);
++ subscriber.onChange(endpoint, ApplicationState.STATUS_WITH_PORT, shutdown);
+ logger.debug("Marked {} as shutdown", endpoint);
}
/**
diff --cc test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index a68e819,0000000..1755857
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@@ -1,774 -1,0 +1,805 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.shared;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Futures;
+import org.junit.Assert;
+
++import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.impl.AbstractCluster;
+import org.apache.cassandra.distributed.impl.InstanceConfig;
+import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Utilities for working with jvm-dtest clusters.
+ *
+ * This class is marked as Isolated as it relies on lambdas, which are in a package that is marked as shared, so need to
+ * tell jvm-dtest to not share this class.
+ *
+ * This class should never be called from within the cluster, always in the App ClassLoader.
+ */
+@Isolated
+public class ClusterUtils
+{
+ /**
+ * Start the instance with the given System Properties, after the instance has started, the properties will be cleared.
+ */
+ public static <I extends IInstance> I start(I inst, Consumer<WithProperties> fn)
+ {
+ return start(inst, (ignore, prop) -> fn.accept(prop));
+ }
+
+ /**
+ * Start the instance with the given System Properties, after the instance has started, the properties will be cleared.
+ */
+ public static <I extends IInstance> I start(I inst, BiConsumer<I, WithProperties> fn)
+ {
+ try (WithProperties properties = new WithProperties())
+ {
+ fn.accept(inst, properties);
+ inst.startup();
+ return inst;
+ }
+ }
+
+ /**
+ * Stop an instance in a blocking manner.
+ *
+ * The main difference between this and {@link IInstance#shutdown()} is that the wait on the future will catch
+ * the exceptions and throw as runtime.
+ */
+ public static void stopUnchecked(IInstance i)
+ {
+ Futures.getUnchecked(i.shutdown());
+ }
+
+ /**
+ * Stops an instance abruptly. This is done by blocking all messages to/from so all other instances are unable
+ * to communicate, then stopping the instance gracefully.
+ *
+ * The assumption is that hard stopping inbound and outbound messages will apear to the cluster as if the instance
+ * was stopped via kill -9; this does not hold true if the instance is restarted as it knows it was properly shutdown.
+ *
+ * @param cluster to filter messages to
+ * @param inst to shut down
+ */
+ public static <I extends IInstance> void stopAbrupt(ICluster<I> cluster, I inst)
+ {
+ // block all messages to/from the node going down to make sure a clean shutdown doesn't happen
+ IMessageFilters.Filter to = cluster.filters().allVerbs().to(inst.config().num()).drop();
+ IMessageFilters.Filter from = cluster.filters().allVerbs().from(inst.config().num()).drop();
+ try
+ {
+ stopUnchecked(inst);
+ }
+ finally
+ {
+ from.off();
+ to.off();
+ }
+ }
+
+ /**
+ * Stop all the instances in the cluster. This function is differe than {@link ICluster#close()} as it doesn't
+ * clean up the cluster state, it only stops all the instances.
+ */
+ public static <I extends IInstance> void stopAll(ICluster<I> cluster)
+ {
+ cluster.stream().forEach(ClusterUtils::stopUnchecked);
+ }
+
+ /**
+ * Create a new instance and add it to the cluster, without starting it.
+ *
+ * @param cluster to add to
+ * @param dc the instance should be in
+ * @param rack the instance should be in
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster,
+ String dc, String rack)
+ {
+ return addInstance(cluster, dc, rack, ignore -> {});
+ }
+
+ /**
+ * Create a new instance and add it to the cluster, without starting it.
+ *
+ * @param cluster to add to
+ * @param dc the instance should be in
+ * @param rack the instance should be in
+ * @param fn function to add to the config before starting
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster,
+ String dc, String rack,
+ Consumer<IInstanceConfig> fn)
+ {
+ Objects.requireNonNull(dc, "dc");
+ Objects.requireNonNull(rack, "rack");
+
+ InstanceConfig config = cluster.newInstanceConfig();
+ //TODO adding new instances should be cleaner, currently requires you create the cluster with all
+ // instances known about (at least to NetworkTopology and TokenStategy)
+ // this is very hidden, so should be more explicit
+ config.networkTopology().put(config.broadcastAddress(), NetworkTopology.dcAndRack(dc, rack));
+
+ fn.accept(config);
+
+ return cluster.bootstrap(config);
+ }
+
+ /**
+ * Create and start a new instance that replaces an existing instance.
+ *
+ * The instance will be in the same datacenter and rack as the existing instance.
+ *
+ * @param cluster to add to
+ * @param toReplace instance to replace
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster, IInstance toReplace)
+ {
+ return replaceHostAndStart(cluster, toReplace, ignore -> {});
+ }
+
+ /**
+ * Create and start a new instance that replaces an existing instance.
+ *
+ * The instance will be in the same datacenter and rack as the existing instance.
+ *
+ * @param cluster to add to
+ * @param toReplace instance to replace
+ * @param fn lambda to add additional properties
+ * @param <I> instance type
+ * @return the instance added
+ */
+ public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster,
+ IInstance toReplace,
+ Consumer<WithProperties> fn)
+ {
+ IInstanceConfig toReplaceConf = toReplace.config();
+ I inst = addInstance(cluster, toReplaceConf.localDatacenter(), toReplaceConf.localRack(), c -> c.set("auto_bootstrap", true));
+
+ return start(inst, properties -> {
+ // lower this so the replacement waits less time
+ properties.setProperty("cassandra.broadcast_interval_ms", Long.toString(TimeUnit.SECONDS.toMillis(30)));
+ // default is 30s, lowering as it should be faster
+ properties.setProperty("cassandra.ring_delay_ms", Long.toString(TimeUnit.SECONDS.toMillis(10)));
+ properties.set(BOOTSTRAP_SCHEMA_DELAY_MS, TimeUnit.SECONDS.toMillis(10));
+
+ // state which node to replace
+ properties.setProperty("cassandra.replace_address_first_boot", toReplace.config().broadcastAddress().getAddress().getHostAddress());
+
+ fn.accept(properties);
+ });
+ }
+
+ /**
+ * Calls {@link org.apache.cassandra.locator.TokenMetadata#sortedTokens()}, returning as a list of strings.
+ */
+ public static List<String> getTokenMetadataTokens(IInvokableInstance inst)
+ {
+ return inst.callOnInstance(() ->
+ StorageService.instance.getTokenMetadata()
+ .sortedTokens().stream()
+ .map(Object::toString)
+ .collect(Collectors.toList()));
+ }
+
++ public static String getLocalToken(IInvokableInstance inst)
++ {
++ return inst.callOnInstance(() -> {
++ List<String> tokens = new ArrayList<>();
++ for (Token t : StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddressAndPort()))
++ tokens.add(t.getTokenValue().toString());
++
++ assert tokens.size() == 1 : "getLocalToken assumes a single token, but multiple tokens found";
++ return tokens.get(0);
++ });
++ }
++
++ public static <I extends IInstance> void runAndWaitForLogs(Runnable r, String waitString, AbstractCluster<I> cluster) throws TimeoutException
++ {
++ runAndWaitForLogs(r, waitString, cluster.stream().toArray(IInstance[]::new));
++ }
++
++ public static void runAndWaitForLogs(Runnable r, String waitString, IInstance...instances) throws TimeoutException
++ {
++ long [] marks = new long[instances.length];
++ for (int i = 0; i < instances.length; i++)
++ marks[i] = instances[i].logs().mark();
++ r.run();
++ for (int i = 0; i < instances.length; i++)
++ instances[i].logs().watchFor(marks[i], waitString);
++ }
++
++
+ /**
+ * Get the ring from the perspective of the instance.
+ */
+ public static List<RingInstanceDetails> ring(IInstance inst)
+ {
+ NodeToolResult results = inst.nodetoolResult("ring");
+ results.asserts().success();
+ return parseRing(results.getStdout());
+ }
+
+ /**
+ * Make sure the target instance is in the ring.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance expected in the ring
+ * @return the ring (if target is present)
+ */
+ public static List<RingInstanceDetails> assertInRing(IInstance instance, IInstance expectedInRing)
+ {
+ String targetAddress = getBroadcastAddressHostString(expectedInRing);
+ List<RingInstanceDetails> ring = ring(instance);
+ Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(targetAddress)).findFirst();
+ assertThat(match).as("Not expected to find %s but was found", targetAddress).isPresent();
+ return ring;
+ }
+
+ /**
+ * Make sure the target instance's gossip state matches on the source instance
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance expected in the ring
+ * @param state expected gossip state
+ * @return the ring (if target is present and has expected state)
+ */
+ public static List<RingInstanceDetails> assertRingState(IInstance instance, IInstance expectedInRing, String state)
+ {
+ String targetAddress = getBroadcastAddressHostString(expectedInRing);
+ List<RingInstanceDetails> ring = ring(instance);
+ List<RingInstanceDetails> match = ring.stream()
+ .filter(d -> d.address.equals(targetAddress))
+ .collect(Collectors.toList());
+ assertThat(match)
+ .isNotEmpty()
+ .as("State was expected to be %s but was not", state)
+ .anyMatch(r -> r.state.equals(state));
+ return ring;
+ }
+
+ /**
+ * Make sure the target instance is NOT in the ring.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance not expected in the ring
+ * @return the ring (if target is not present)
+ */
+ public static List<RingInstanceDetails> assertNotInRing(IInstance instance, IInstance expectedInRing)
+ {
+ String targetAddress = getBroadcastAddressHostString(expectedInRing);
+ List<RingInstanceDetails> ring = ring(instance);
+ Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(targetAddress)).findFirst();
+ Assert.assertEquals("Not expected to find " + targetAddress + " but was found", Optional.empty(), match);
+ return ring;
+ }
+
+ private static List<RingInstanceDetails> awaitRing(IInstance src, String errorMessage, Predicate<List<RingInstanceDetails>> fn)
+ {
+ List<RingInstanceDetails> ring = null;
+ for (int i = 0; i < 100; i++)
+ {
+ ring = ring(src);
+ if (fn.test(ring))
+ {
+ return ring;
+ }
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ throw new AssertionError(errorMessage + "\n" + ring);
+ }
+
+ /**
+ * Wait for the target to be in the ring as seen by the source instance.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance to wait for
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, IInstance expectedInRing)
+ {
+ return awaitRingJoin(instance, expectedInRing.broadcastAddress().getAddress().getHostAddress());
+ }
+
+ /**
+ * Wait for the target to be in the ring as seen by the source instance.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing instance address to wait for
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, String expectedInRing)
+ {
+ return awaitRing(instance, "Node " + expectedInRing + " did not join the ring...", ring -> {
+ Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(expectedInRing)).findFirst();
+ if (match.isPresent())
+ {
+ RingInstanceDetails details = match.get();
+ return details.status.equals("Up") && details.state.equals("Normal");
+ }
+ return false;
+ });
+ }
+
+ /**
+ * Wait for the ring to only have instances that are Up and Normal.
+ *
+ * @param src instance to check on
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingHealthy(IInstance src)
+ {
+ return awaitRing(src, "Timeout waiting for ring to become healthy",
+ ring ->
+ ring.stream().allMatch(ClusterUtils::isRingInstanceDetailsHealthy));
+ }
+
+ /**
+ * Wait for the ring to have the target instance with the provided state.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing to look for
+ * @param state expected
+ * @return the ring
+ */
+ public static List<RingInstanceDetails> awaitRingState(IInstance instance, IInstance expectedInRing, String state)
+ {
+ return awaitRing(instance, "Timeout waiting for " + expectedInRing + " to have state " + state,
+ ring ->
+ ring.stream()
+ .filter(d -> d.address.equals(getBroadcastAddressHostString(expectedInRing)))
+ .filter(d -> d.state.equals(state))
+ .findAny().isPresent());
+ }
+
+ /**
+ * Make sure the ring is only the expected instances. The source instance may not be in the ring, so this function
+ * only relies on the expectedInsts param.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing expected instances in the ring
+ * @return the ring (if condition is true)
+ */
+ public static List<RingInstanceDetails> assertRingIs(IInstance instance, IInstance... expectedInRing)
+ {
+ return assertRingIs(instance, Arrays.asList(expectedInRing));
+ }
+
+ /**
+ * Make sure the ring is only the expected instances. The source instance may not be in the ring, so this function
+ * only relies on the expectedInsts param.
+ *
+ * @param instance instance to check on
+ * @param expectedInRing expected instances in the ring
+ * @return the ring (if condition is true)
+ */
+ public static List<RingInstanceDetails> assertRingIs(IInstance instance, Collection<? extends IInstance> expectedInRing)
+ {
+ Set<String> expectedRingAddresses = expectedInRing.stream()
+ .map(i -> i.config().broadcastAddress().getAddress().getHostAddress())
+ .collect(Collectors.toSet());
+ return assertRingIs(instance, expectedRingAddresses);
+ }
+
+ /**
+ * Make sure the ring is only the expected instances. The source instance may not be in the ring, so this function
+ * only relies on the expectedInsts param.
+ *
+ * @param instance instance to check on
+ * @param expectedRingAddresses expected instances addresses in the ring
+ * @return the ring (if condition is true)
+ */
+ public static List<RingInstanceDetails> assertRingIs(IInstance instance, Set<String> expectedRingAddresses)
+ {
+ List<RingInstanceDetails> ring = ring(instance);
+ Set<String> ringAddresses = ring.stream().map(d -> d.address).collect(Collectors.toSet());
+ assertThat(ringAddresses)
+ .as("Ring addreses did not match for instance %s", instance)
+ .isEqualTo(expectedRingAddresses);
+ return ring;
+ }
+
+ private static boolean isRingInstanceDetailsHealthy(RingInstanceDetails details)
+ {
+ return details.status.equals("Up") && details.state.equals("Normal");
+ }
+
+ private static List<RingInstanceDetails> parseRing(String str)
+ {
+ // 127.0.0.3 rack0 Up Normal 46.21 KB 100.00% -1
+ // /127.0.0.1:7012 Unknown ? Normal ? 100.00% -3074457345618258603
+ Pattern pattern = Pattern.compile("^(/?[0-9.:]+)\\s+(\\w+|\\?)\\s+(\\w+|\\?)\\s+(\\w+|\\?).*?(-?\\d+)\\s*$");
+ List<RingInstanceDetails> details = new ArrayList<>();
+ String[] lines = str.split("\n");
+ for (String line : lines)
+ {
+ Matcher matcher = pattern.matcher(line);
+ if (!matcher.find())
+ {
+ continue;
+ }
+ details.add(new RingInstanceDetails(matcher.group(1), matcher.group(2), matcher.group(3), matcher.group(4), matcher.group(5)));
+ }
+
+ return details;
+ }
+
+ private static Map<String, Map<String, String>> awaitGossip(IInstance src, String errorMessage, Predicate<Map<String, Map<String, String>>> fn)
+ {
+ Map<String, Map<String, String>> gossip = null;
+ for (int i = 0; i < 100; i++)
+ {
+ gossip = gossipInfo(src);
+ if (fn.test(gossip))
+ {
+ return gossip;
+ }
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ throw new AssertionError(errorMessage + "\n" + gossip);
+ }
+
+ /**
+ * Wait for the target instance to have the desired status. Target status is checked via string contains so works
+ * with 'NORMAL' but also can check tokens or full state.
+ *
+ * @param instance instance to check on
+ * @param expectedInGossip instance to wait for
+ * @param targetStatus for the instance
+ * @return gossip info
+ */
+ public static Map<String, Map<String, String>> awaitGossipStatus(IInstance instance, IInstance expectedInGossip, String targetStatus)
+ {
+ return awaitGossip(instance, "Node " + expectedInGossip + " did not match state " + targetStatus, gossip -> {
+ Map<String, String> state = gossip.get(getBroadcastAddressString(expectedInGossip));
+ if (state == null)
+ return false;
+ String status = state.get("STATUS_WITH_PORT");
+ if (status == null)
+ status = state.get("STATUS");
+ if (status == null)
+ return targetStatus == null;
+ return status.contains(targetStatus);
+ });
+ }
+
+ /**
+ * Get the gossip information from the node. Currently only address, generation, and heartbeat are returned
+ *
+ * @param inst to check on
+ * @return gossip info
+ */
+ public static Map<String, Map<String, String>> gossipInfo(IInstance inst)
+ {
+ NodeToolResult results = inst.nodetoolResult("gossipinfo");
+ results.asserts().success();
+ return parseGossipInfo(results.getStdout());
+ }
+
+ /**
+ * Make sure the gossip info for the specific target has the expected generation and heartbeat
+ *
+ * @param instance to check on
+ * @param expectedInGossip instance to check for
+ * @param expectedGeneration expected generation
+ * @param expectedHeartbeat expected heartbeat
+ */
+ public static void assertGossipInfo(IInstance instance,
+ InetSocketAddress expectedInGossip, int expectedGeneration, int expectedHeartbeat)
+ {
+ String targetAddress = expectedInGossip.getAddress().toString();
+ Map<String, Map<String, String>> gossipInfo = gossipInfo(instance);
+ Map<String, String> gossipState = gossipInfo.get(targetAddress);
+ if (gossipState == null)
+ throw new NullPointerException("Unable to find gossip info for " + targetAddress + "; gossip info = " + gossipInfo);
+ Assert.assertEquals(Long.toString(expectedGeneration), gossipState.get("generation"));
+ Assert.assertEquals(Long.toString(expectedHeartbeat), gossipState.get("heartbeat")); //TODO do we really mix these two?
+ }
+
+ private static Map<String, Map<String, String>> parseGossipInfo(String str)
+ {
+ Map<String, Map<String, String>> map = new HashMap<>();
+ String[] lines = str.split("\n");
+ String currentInstance = null;
+ for (String line : lines)
+ {
+ if (line.startsWith("/"))
+ {
+ // start of new instance
+ currentInstance = line;
+ continue;
+ }
+ Objects.requireNonNull(currentInstance);
+ String[] kv = line.trim().split(":", 2);
+ assert kv.length == 2 : "When splitting line '" + line + "' expected 2 parts but not true";
+ Map<String, String> state = map.computeIfAbsent(currentInstance, ignore -> new HashMap<>());
+ state.put(kv[0], kv[1]);
+ }
+
+ return map;
+ }
+
+ /**
+ * Get the tokens assigned to the instance via config. This method does not work if the instance has learned
+ * or generated its tokens.
+ *
+ * @param instance to get tokens from
+ * @return non-empty list of tokens
+ */
+ public static List<String> getTokens(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ int numTokens = conf.getInt("num_tokens");
+ Assert.assertEquals("Only single token is supported", 1, numTokens);
+ String token = conf.getString("initial_token");
+ Assert.assertNotNull("initial_token was not found", token);
+ return Arrays.asList(token);
+ }
+
+ /**
+ * Get all data directories for the given instance.
+ *
+ * @param instance to get data directories for
+ * @return data directories
+ */
+ public static List<File> getDataDirectories(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ // this isn't safe as it assumes the implementation of InstanceConfig
+ // might need to get smarter... some day...
+ String[] ds = (String[]) conf.get("data_file_directories");
+ List<File> files = new ArrayList<>(ds.length);
+ for (int i = 0; i < ds.length; i++)
+ files.add(new File(ds[i]));
+ return files;
+ }
+
+ /**
+ * Get the commit log directory for the given instance.
+ *
+ * @param instance to get the commit log directory for
+ * @return commit log directory
+ */
+ public static File getCommitLogDirectory(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ // this isn't safe as it assumes the implementation of InstanceConfig
+ // might need to get smarter... some day...
+ String d = (String) conf.get("commitlog_directory");
+ return new File(d);
+ }
+
+ /**
+ * Get the hints directory for the given instance.
+ *
+ * @param instance to get the hints directory for
+ * @return hints directory
+ */
+ public static File getHintsDirectory(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ // this isn't safe as it assumes the implementation of InstanceConfig
+ // might need to get smarter... some day...
+ String d = (String) conf.get("hints_directory");
+ return new File(d);
+ }
+
+ /**
+ * Get the saved caches directory for the given instance.
+ *
+ * @param instance to get the saved caches directory for
+ * @return saved caches directory
+ */
+ public static File getSavedCachesDirectory(IInstance instance)
+ {
+ IInstanceConfig conf = instance.config();
+ // this isn't safe as it assumes the implementation of InstanceConfig
+ // might need to get smarter... some day...
+ String d = (String) conf.get("saved_caches_directory");
+ return new File(d);
+ }
+
+ /**
+ * Get all writable directories for the given instance.
+ *
+ * @param instance to get directories for
+ * @return all writable directories
+ */
+ public static List<File> getDirectories(IInstance instance)
+ {
+ List<File> out = new ArrayList<>();
+ out.addAll(getDataDirectories(instance));
+ out.add(getCommitLogDirectory(instance));
+ out.add(getHintsDirectory(instance));
+ out.add(getSavedCachesDirectory(instance));
+ return out;
+ }
+
+ /**
+ * Gets the name of the Partitioner for the given instance.
+ *
+ * @param instance to get partitioner from
+ * @return partitioner name
+ */
+ public static String getPartitionerName(IInstance instance)
+ {
+ return (String) instance.config().get("partitioner");
+ }
+
+ /**
+ * Changes the instance's address to the new address. This method should only be called while the instance is
+ * down, else has undefined behavior.
+ *
+ * @param instance to update address for
+ * @param address to set
+ */
+ public static void updateAddress(IInstance instance, String address)
+ {
+ updateAddress(instance.config(), address);
+ }
+
+ /**
+ * Changes the instance's address to the new address. This method should only be called while the instance is
+ * down, else has undefined behavior.
+ *
+ * @param conf to update address for
+ * @param address to set
+ */
+ private static void updateAddress(IInstanceConfig conf, String address)
+ {
+ for (String key : Arrays.asList("broadcast_address", "listen_address", "broadcast_rpc_address", "rpc_address"))
+ conf.set(key, address);
+
+ // InstanceConfig caches InetSocketAddress -> InetAddressAndPort
+ // this causes issues as startup now ignores config, so force reset it to pull from conf.
+ ((InstanceConfig) conf).unsetBroadcastAddressAndPort(); //TODO remove the need to null out the cache...
+ conf.networkTopology().put(conf.broadcastAddress(), NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack()));
+ }
+
+ /**
+ * Get the broadcast address host address only (ex. 127.0.0.1)
+ */
+ private static String getBroadcastAddressHostString(IInstance target)
+ {
+ return target.config().broadcastAddress().getAddress().getHostAddress();
+ }
+
+ /**
+ * Get the broadcast address in host:port format (ex. 127.0.0.1:7190)
+ */
+ public static String getBroadcastAddressHostWithPortString(IInstance target)
+ {
+ InetSocketAddress address = target.config().broadcastAddress();
+ return address.getAddress().getHostAddress() + ":" + address.getPort();
+ }
+
+ /**
+ * Get the broadcast address InetAddess string (ex. localhost/127.0.0.1 or /127.0.0.1)
+ */
+ private static String getBroadcastAddressString(IInstance target)
+ {
+ return target.config().broadcastAddress().getAddress().toString();
+ }
+
+ public static final class RingInstanceDetails
+ {
+ private final String address;
+ private final String rack;
+ private final String status;
+ private final String state;
+ private final String token;
+
+ private RingInstanceDetails(String address, String rack, String status, String state, String token)
+ {
+ this.address = address;
+ this.rack = rack;
+ this.status = status;
+ this.state = state;
+ this.token = token;
+ }
+
+ public String getAddress()
+ {
+ return address;
+ }
+
+ public String getRack()
+ {
+ return rack;
+ }
+
+ public String getStatus()
+ {
+ return status;
+ }
+
+ public String getState()
+ {
+ return state;
+ }
+
+ public String getToken()
+ {
+ return token;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RingInstanceDetails that = (RingInstanceDetails) o;
+ return Objects.equals(address, that.address) &&
+ Objects.equals(rack, that.rack) &&
+ Objects.equals(status, that.status) &&
+ Objects.equals(state, that.state) &&
+ Objects.equals(token, that.token);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(address, rack, status, state, token);
+ }
+
+ public String toString()
+ {
+ return Arrays.asList(address, rack, status, state, token).toString();
+ }
+ }
+}
diff --cc test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index 7ebfd0c,ba6027b..1eb7f04
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@@ -19,13 -19,11 +19,9 @@@
package org.apache.cassandra.distributed.test;
import java.io.Closeable;
-import java.net.InetAddress;
-import java.util.ArrayList;
+import java.net.InetSocketAddress;
import java.util.Collection;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
-import java.util.List;
+ import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
@@@ -42,15 -42,17 +40,21 @@@ import org.apache.cassandra.distributed
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.service.PendingRangeCalculatorService;
import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.StreamPlan;
+ import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.utils.FBUtilities;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
++import static org.apache.cassandra.distributed.shared.ClusterUtils.getLocalToken;
++import static org.apache.cassandra.distributed.shared.ClusterUtils.runAndWaitForLogs;
+ import static org.junit.Assert.assertEquals;
public class GossipTest extends TestBaseImpl
{
@@@ -232,4 -228,128 +236,102 @@@
}
}
+ @Test
+ public void gossipShutdownUpdatesTokenMetadata() throws Exception
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
+ .withInstanceInitializer(FailureHelper::installMoveFailure)
+ .start())
+ {
+ init(cluster, 2);
+ populate(cluster);
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+ IInvokableInstance node3 = cluster.get(3);
+
+ // initiate a move for node2, which will not complete due to the
+ // ByteBuddy interceptor we injected. Wait for the other two nodes
+ // to mark node2 as moving before proceeding.
+ long t2 = Long.parseLong(getLocalToken(node2));
+ long t3 = Long.parseLong(getLocalToken(node3));
+ long moveTo = t2 + ((t3 - t2)/2);
- String logMsg = "Node " + node2.broadcastAddress().getAddress() + " state moving, new token " + moveTo;
++ String logMsg = "Node " + node2.broadcastAddress() + " state moving, new token " + moveTo;
+ runAndWaitForLogs(() -> node2.nodetoolResult("move", "--", Long.toString(moveTo)).asserts().failure(),
+ logMsg,
+ cluster);
+
- InetAddress movingAddress = node2.broadcastAddress().getAddress();
++ InetSocketAddress movingAddress = node2.broadcastAddress();
+ // node1 & node3 should now consider some ranges pending for node2
+ assertPendingRangesForPeer(true, movingAddress, cluster);
+
+ // A controlled shutdown causes peers to replace the MOVING status to be with SHUTDOWN, but prior to
+ // CASSANDRA-16796 this doesn't update TokenMetadata, so they maintain pending ranges for the down node
+ // indefinitely, even after it has been removed from the ring.
- logMsg = "Marked " + node2.broadcastAddress().getAddress() + " as shutdown";
++ logMsg = "Marked " + node2.broadcastAddress() + " as shutdown";
+ runAndWaitForLogs(() -> Futures.getUnchecked(node2.shutdown()),
+ logMsg,
+ node1, node3);
+ // node1 & node3 should not consider any ranges as still pending for node2
+ assertPendingRangesForPeer(false, movingAddress, cluster);
+ }
+ }
+
- void assertPendingRangesForPeer(final boolean expectPending, final InetAddress movingAddress, final Cluster cluster)
++ void assertPendingRangesForPeer(final boolean expectPending, final InetSocketAddress movingAddress, final Cluster cluster)
+ {
+ for (IInvokableInstance inst : new IInvokableInstance[]{ cluster.get(1), cluster.get(3)})
+ {
- boolean hasPending = inst.appliesOnInstance((InetAddress peer) -> {
++ boolean hasPending = inst.appliesOnInstance((InetSocketAddress address) -> {
++ InetAddressAndPort peer = toCassandraInetAddressAndPort(address);
+
+ PendingRangeCalculatorService.instance.blockUntilFinished();
+
+ boolean isMoving = StorageService.instance.getTokenMetadata()
+ .getMovingEndpoints()
+ .stream()
+ .map(pair -> pair.right)
+ .anyMatch(peer::equals);
+
+ return isMoving && !StorageService.instance.getTokenMetadata()
+ .getPendingRanges(KEYSPACE, peer)
+ .isEmpty();
+ }).apply(movingAddress);
+ assertEquals(String.format("%s should %shave PENDING RANGES for %s",
+ inst.broadcastAddress().getHostString(),
+ expectPending ? "" : "not ",
+ movingAddress),
+ hasPending, expectPending);
+ }
+ }
+
- private String getLocalToken(IInvokableInstance inst)
- {
- return inst.callOnInstance(() -> {
- List<String> tokens = new ArrayList<>();
- for (Token t : StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()))
- tokens.add(t.getTokenValue().toString());
-
- assert tokens.size() == 1 : "getLocalToken assumes a single token, but multiple tokens found";
- return tokens.get(0);
- });
- }
-
- public static void runAndWaitForLogs(Runnable r, String waitString, Cluster cluster) throws TimeoutException
- {
- runAndWaitForLogs(r, waitString, cluster.stream().toArray(IInstance[]::new));
- }
-
- public static void runAndWaitForLogs(Runnable r, String waitString, IInstance...instances) throws TimeoutException
- {
- long [] marks = new long[instances.length];
- for (int i = 0; i < instances.length; i++)
- marks[i] = instances[i].logs().mark();
- r.run();
- for (int i = 0; i < instances.length; i++)
- instances[i].logs().watchFor(marks[i], waitString);
- }
-
+ static void populate(Cluster cluster)
+ {
+ cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int PRIMARY KEY)");
+ for (int i = 0; i < 10; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)",
+ ConsistencyLevel.ALL,
+ i);
+ }
+ }
+
+ public static class FailureHelper
+ {
+ static void installMoveFailure(ClassLoader cl, int nodeNumber)
+ {
+ if (nodeNumber == 2)
+ {
+ new ByteBuddy().redefine(StreamPlan.class)
+ .method(named("execute"))
+ .intercept(MethodDelegation.to(FailureHelper.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ public static StreamResultFuture execute()
+ {
+ throw new RuntimeException("failing to execute move");
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org