You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2021/07/20 18:39:44 UTC

[cassandra] 01/01: Merge branch 'cassaNDRA-3.11' into cassandra-4.0

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 64ec400ab6504ebf9447eb7250064a7795e29b81
Merge: 1853006 b604bd2
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Jul 20 19:31:56 2021 +0100

    Merge branch 'cassaNDRA-3.11' into cassandra-4.0

 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   6 +-
 .../cassandra/distributed/shared/ClusterUtils.java |  31 ++++++
 .../cassandra/distributed/test/GossipTest.java     | 112 ++++++++++++++++++++-
 4 files changed, 144 insertions(+), 6 deletions(-)

diff --cc CHANGES.txt
index b51f94f,89c7813..414dc3b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -9,7 -3,16 +9,8 @@@ Merged from 3.11
   * Optimize bytes skipping when reading SSTable files (CASSANDRA-14415)
   * Enable tombstone compactions when unchecked_tombstone_compaction is set in TWCS (CASSANDRA-14496)
   * Read only the required SSTables for single partition queries (CASSANDRA-16737)
 - * Fix LeveledCompactionStrategy compacts last level throw an ArrayIndexOutOfBoundsException (CASSANDRA-15669)
 - * Maps $CASSANDRA_LOG_DIR to cassandra.logdir java property when executing nodetool (CASSANDRA-16199)
 - * Nodetool garbagecollect should retain SSTableLevel for LCS (CASSANDRA-16634)
 - * Ignore stale acks received in the shadow round (CASSANDRA-16588)
 - * Add autocomplete and error messages for provide_overlapping_tombstones (CASSANDRA-16350)
 - * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) (CASSANDRA-16447)
 - * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552)
 - * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
  Merged from 3.0:
+  * Receipt of gossip shutdown notification updates TokenMetadata (CASSANDRA-16796)
   * Count bloom filter misses correctly (CASSANDRA-12922)
   * Reject token() in MV WHERE clause (CASSANDRA-13464)
   * Ensure java executable is on the path (CASSANDRA-14325)
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index e194807,1603693..2c38cfb
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -539,13 -435,15 +539,17 @@@ public class Gossiper implements IFailu
          EndpointState epState = endpointStateMap.get(endpoint);
          if (epState == null)
              return;
-         epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, StorageService.instance.valueFactory.shutdown(true));
+         VersionedValue shutdown = StorageService.instance.valueFactory.shutdown(true);
 -        epState.addApplicationState(ApplicationState.STATUS, shutdown);
++        epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, shutdown);
 +        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
          epState.addApplicationState(ApplicationState.RPC_READY, StorageService.instance.valueFactory.rpcReady(false));
          epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
          markDead(endpoint, epState);
          FailureDetector.instance.forceConviction(endpoint);
 +        GossiperDiagnostics.markedAsShutdown(this, endpoint);
+         for (IEndpointStateChangeSubscriber subscriber : subscribers)
 -            subscriber.onChange(endpoint, ApplicationState.STATUS, shutdown);
++            subscriber.onChange(endpoint, ApplicationState.STATUS_WITH_PORT, shutdown);
+         logger.debug("Marked {} as shutdown", endpoint);
      }
  
      /**
diff --cc test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index a68e819,0000000..1755857
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@@ -1,774 -1,0 +1,805 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.shared;
 +
 +import java.io.File;
 +import java.net.InetSocketAddress;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
 +import java.util.function.BiConsumer;
 +import java.util.function.Consumer;
 +import java.util.function.Predicate;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.util.concurrent.Futures;
 +import org.junit.Assert;
 +
++import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.api.ICluster;
 +import org.apache.cassandra.distributed.api.IInstance;
 +import org.apache.cassandra.distributed.api.IInstanceConfig;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.IMessageFilters;
 +import org.apache.cassandra.distributed.api.NodeToolResult;
 +import org.apache.cassandra.distributed.impl.AbstractCluster;
 +import org.apache.cassandra.distributed.impl.InstanceConfig;
 +import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.utils.FBUtilities;
 +
 +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SCHEMA_DELAY_MS;
 +import static org.assertj.core.api.Assertions.assertThat;
 +
 +/**
 + * Utilities for working with jvm-dtest clusters.
 + *
 + * This class is marked as Isolated as it relies on lambdas, which are in a package that is marked as shared, so need to
 + * tell jvm-dtest to not share this class.
 + *
 + * This class should never be called from within the cluster, always in the App ClassLoader.
 + */
 +@Isolated
 +public class ClusterUtils
 +{
 +    /**
 +     * Start the instance with the given System Properties, after the instance has started, the properties will be cleared.
 +     */
 +    public static <I extends IInstance> I start(I inst, Consumer<WithProperties> fn)
 +    {
 +        return start(inst, (ignore, prop) -> fn.accept(prop));
 +    }
 +
 +    /**
 +     * Start the instance with the given System Properties, after the instance has started, the properties will be cleared.
 +     */
 +    public static <I extends IInstance> I start(I inst, BiConsumer<I, WithProperties> fn)
 +    {
 +        try (WithProperties properties = new WithProperties())
 +        {
 +            fn.accept(inst, properties);
 +            inst.startup();
 +            return inst;
 +        }
 +    }
 +
 +    /**
 +     * Stop an instance in a blocking manner.
 +     *
 +     * The main difference between this and {@link IInstance#shutdown()} is that the wait on the future will catch
 +     * the exceptions and throw as runtime.
 +     */
 +    public static void stopUnchecked(IInstance i)
 +    {
 +        Futures.getUnchecked(i.shutdown());
 +    }
 +
 +    /**
 +     * Stops an instance abruptly.  This is done by blocking all messages to/from so all other instances are unable
 +     * to communicate, then stopping the instance gracefully.
 +     *
 +     * The assumption is that hard stopping inbound and outbound messages will apear to the cluster as if the instance
 +     * was stopped via kill -9; this does not hold true if the instance is restarted as it knows it was properly shutdown.
 +     *
 +     * @param cluster to filter messages to
 +     * @param inst to shut down
 +     */
 +    public static <I extends IInstance> void stopAbrupt(ICluster<I> cluster, I inst)
 +    {
 +        // block all messages to/from the node going down to make sure a clean shutdown doesn't happen
 +        IMessageFilters.Filter to = cluster.filters().allVerbs().to(inst.config().num()).drop();
 +        IMessageFilters.Filter from = cluster.filters().allVerbs().from(inst.config().num()).drop();
 +        try
 +        {
 +            stopUnchecked(inst);
 +        }
 +        finally
 +        {
 +            from.off();
 +            to.off();
 +        }
 +    }
 +
 +    /**
 +     * Stop all the instances in the cluster.  This function is differe than {@link ICluster#close()} as it doesn't
 +     * clean up the cluster state, it only stops all the instances.
 +     */
 +    public static <I extends IInstance> void stopAll(ICluster<I> cluster)
 +    {
 +        cluster.stream().forEach(ClusterUtils::stopUnchecked);
 +    }
 +
 +    /**
 +     * Create a new instance and add it to the cluster, without starting it.
 +     *
 +     * @param cluster to add to
 +     * @param dc the instance should be in
 +     * @param rack the instance should be in
 +     * @param <I> instance type
 +     * @return the instance added
 +     */
 +    public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster,
 +                                                      String dc, String rack)
 +    {
 +        return addInstance(cluster, dc, rack, ignore -> {});
 +    }
 +
 +    /**
 +     * Create a new instance and add it to the cluster, without starting it.
 +     *
 +     * @param cluster to add to
 +     * @param dc the instance should be in
 +     * @param rack the instance should be in
 +     * @param fn function to add to the config before starting
 +     * @param <I> instance type
 +     * @return the instance added
 +     */
 +    public static <I extends IInstance> I addInstance(AbstractCluster<I> cluster,
 +                                                      String dc, String rack,
 +                                                      Consumer<IInstanceConfig> fn)
 +    {
 +        Objects.requireNonNull(dc, "dc");
 +        Objects.requireNonNull(rack, "rack");
 +
 +        InstanceConfig config = cluster.newInstanceConfig();
 +        //TODO adding new instances should be cleaner, currently requires you create the cluster with all
 +        // instances known about (at least to NetworkTopology and TokenStategy)
 +        // this is very hidden, so should be more explicit
 +        config.networkTopology().put(config.broadcastAddress(), NetworkTopology.dcAndRack(dc, rack));
 +
 +        fn.accept(config);
 +
 +        return cluster.bootstrap(config);
 +    }
 +
 +    /**
 +     * Create and start a new instance that replaces an existing instance.
 +     *
 +     * The instance will be in the same datacenter and rack as the existing instance.
 +     *
 +     * @param cluster to add to
 +     * @param toReplace instance to replace
 +     * @param <I> instance type
 +     * @return the instance added
 +     */
 +    public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster, IInstance toReplace)
 +    {
 +        return replaceHostAndStart(cluster, toReplace, ignore -> {});
 +    }
 +
 +    /**
 +     * Create and start a new instance that replaces an existing instance.
 +     *
 +     * The instance will be in the same datacenter and rack as the existing instance.
 +     *
 +     * @param cluster to add to
 +     * @param toReplace instance to replace
 +     * @param fn lambda to add additional properties
 +     * @param <I> instance type
 +     * @return the instance added
 +     */
 +    public static <I extends IInstance> I replaceHostAndStart(AbstractCluster<I> cluster,
 +                                                              IInstance toReplace,
 +                                                              Consumer<WithProperties> fn)
 +    {
 +        IInstanceConfig toReplaceConf = toReplace.config();
 +        I inst = addInstance(cluster, toReplaceConf.localDatacenter(), toReplaceConf.localRack(), c -> c.set("auto_bootstrap", true));
 +
 +        return start(inst, properties -> {
 +            // lower this so the replacement waits less time
 +            properties.setProperty("cassandra.broadcast_interval_ms", Long.toString(TimeUnit.SECONDS.toMillis(30)));
 +            // default is 30s, lowering as it should be faster
 +            properties.setProperty("cassandra.ring_delay_ms", Long.toString(TimeUnit.SECONDS.toMillis(10)));
 +            properties.set(BOOTSTRAP_SCHEMA_DELAY_MS, TimeUnit.SECONDS.toMillis(10));
 +
 +            // state which node to replace
 +            properties.setProperty("cassandra.replace_address_first_boot", toReplace.config().broadcastAddress().getAddress().getHostAddress());
 +
 +            fn.accept(properties);
 +        });
 +    }
 +
 +    /**
 +     * Calls {@link org.apache.cassandra.locator.TokenMetadata#sortedTokens()}, returning as a list of strings.
 +     */
 +    public static List<String> getTokenMetadataTokens(IInvokableInstance inst)
 +    {
 +        return inst.callOnInstance(() ->
 +                                   StorageService.instance.getTokenMetadata()
 +                                                          .sortedTokens().stream()
 +                                                          .map(Object::toString)
 +                                                          .collect(Collectors.toList()));
 +    }
 +
++    public static String getLocalToken(IInvokableInstance inst)
++    {
++        return inst.callOnInstance(() -> {
++            List<String> tokens = new ArrayList<>();
++            for (Token t : StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddressAndPort()))
++                tokens.add(t.getTokenValue().toString());
++
++            assert tokens.size() == 1 : "getLocalToken assumes a single token, but multiple tokens found";
++            return tokens.get(0);
++        });
++    }
++
++    public static <I extends IInstance> void runAndWaitForLogs(Runnable r, String waitString, AbstractCluster<I> cluster) throws TimeoutException
++    {
++        runAndWaitForLogs(r, waitString, cluster.stream().toArray(IInstance[]::new));
++    }
++
++    public static void runAndWaitForLogs(Runnable r, String waitString, IInstance...instances) throws TimeoutException
++    {
++        long [] marks = new long[instances.length];
++        for (int i = 0; i < instances.length; i++)
++            marks[i] = instances[i].logs().mark();
++        r.run();
++        for (int i = 0; i < instances.length; i++)
++            instances[i].logs().watchFor(marks[i], waitString);
++    }
++
++
 +    /**
 +     * Get the ring from the perspective of the instance.
 +     */
 +    public static List<RingInstanceDetails> ring(IInstance inst)
 +    {
 +        NodeToolResult results = inst.nodetoolResult("ring");
 +        results.asserts().success();
 +        return parseRing(results.getStdout());
 +    }
 +
 +    /**
 +     * Make sure the target instance is in the ring.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInRing instance expected in the ring
 +     * @return the ring (if target is present)
 +     */
 +    public static List<RingInstanceDetails> assertInRing(IInstance instance, IInstance expectedInRing)
 +    {
 +        String targetAddress = getBroadcastAddressHostString(expectedInRing);
 +        List<RingInstanceDetails> ring = ring(instance);
 +        Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(targetAddress)).findFirst();
 +        assertThat(match).as("Not expected to find %s but was found", targetAddress).isPresent();
 +        return ring;
 +    }
 +
 +    /**
 +     * Make sure the target instance's gossip state matches on the source instance
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInRing instance expected in the ring
 +     * @param state expected gossip state
 +     * @return the ring (if target is present and has expected state)
 +     */
 +    public static List<RingInstanceDetails> assertRingState(IInstance instance, IInstance expectedInRing, String state)
 +    {
 +        String targetAddress = getBroadcastAddressHostString(expectedInRing);
 +        List<RingInstanceDetails> ring = ring(instance);
 +        List<RingInstanceDetails> match = ring.stream()
 +                                              .filter(d -> d.address.equals(targetAddress))
 +                                              .collect(Collectors.toList());
 +        assertThat(match)
 +        .isNotEmpty()
 +        .as("State was expected to be %s but was not", state)
 +        .anyMatch(r -> r.state.equals(state));
 +        return ring;
 +    }
 +
 +    /**
 +     * Make sure the target instance is NOT in the ring.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInRing instance not expected in the ring
 +     * @return the ring (if target is not present)
 +     */
 +    public static List<RingInstanceDetails> assertNotInRing(IInstance instance, IInstance expectedInRing)
 +    {
 +        String targetAddress = getBroadcastAddressHostString(expectedInRing);
 +        List<RingInstanceDetails> ring = ring(instance);
 +        Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(targetAddress)).findFirst();
 +        Assert.assertEquals("Not expected to find " + targetAddress + " but was found", Optional.empty(), match);
 +        return ring;
 +    }
 +
 +    private static List<RingInstanceDetails> awaitRing(IInstance src, String errorMessage, Predicate<List<RingInstanceDetails>> fn)
 +    {
 +        List<RingInstanceDetails> ring = null;
 +        for (int i = 0; i < 100; i++)
 +        {
 +            ring = ring(src);
 +            if (fn.test(ring))
 +            {
 +                return ring;
 +            }
 +            sleepUninterruptibly(1, TimeUnit.SECONDS);
 +        }
 +        throw new AssertionError(errorMessage + "\n" + ring);
 +    }
 +
 +    /**
 +     * Wait for the target to be in the ring as seen by the source instance.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInRing instance to wait for
 +     * @return the ring
 +     */
 +    public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, IInstance expectedInRing)
 +    {
 +        return awaitRingJoin(instance, expectedInRing.broadcastAddress().getAddress().getHostAddress());
 +    }
 +
 +    /**
 +     * Wait for the target to be in the ring as seen by the source instance.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInRing instance address to wait for
 +     * @return the ring
 +     */
 +    public static List<RingInstanceDetails> awaitRingJoin(IInstance instance, String expectedInRing)
 +    {
 +        return awaitRing(instance, "Node " + expectedInRing + " did not join the ring...", ring -> {
 +            Optional<RingInstanceDetails> match = ring.stream().filter(d -> d.address.equals(expectedInRing)).findFirst();
 +            if (match.isPresent())
 +            {
 +                RingInstanceDetails details = match.get();
 +                return details.status.equals("Up") && details.state.equals("Normal");
 +            }
 +            return false;
 +        });
 +    }
 +
 +    /**
 +     * Wait for the ring to only have instances that are Up and Normal.
 +     *
 +     * @param src instance to check on
 +     * @return the ring
 +     */
 +    public static List<RingInstanceDetails> awaitRingHealthy(IInstance src)
 +    {
 +        return awaitRing(src, "Timeout waiting for ring to become healthy",
 +                         ring ->
 +                         ring.stream().allMatch(ClusterUtils::isRingInstanceDetailsHealthy));
 +    }
 +
 +    /**
 +     * Wait for the ring to have the target instance with the provided state.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInRing to look for
 +     * @param state expected
 +     * @return the ring
 +     */
 +    public static List<RingInstanceDetails> awaitRingState(IInstance instance, IInstance expectedInRing, String state)
 +    {
 +        return awaitRing(instance, "Timeout waiting for " + expectedInRing + " to have state " + state,
 +                         ring ->
 +                         ring.stream()
 +                             .filter(d -> d.address.equals(getBroadcastAddressHostString(expectedInRing)))
 +                             .filter(d -> d.state.equals(state))
 +                             .findAny().isPresent());
 +    }
 +
 +    /**
 +     * Make sure the ring is only the expected instances.  The source instance may not be in the ring, so this function
 +     * only relies on the expectedInsts param.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInRing expected instances in the ring
 +     * @return the ring (if condition is true)
 +     */
 +    public static List<RingInstanceDetails> assertRingIs(IInstance instance, IInstance... expectedInRing)
 +    {
 +        return assertRingIs(instance, Arrays.asList(expectedInRing));
 +    }
 +
 +    /**
 +     * Make sure the ring is only the expected instances.  The source instance may not be in the ring, so this function
 +     * only relies on the expectedInsts param.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInRing expected instances in the ring
 +     * @return the ring (if condition is true)
 +     */
 +    public static List<RingInstanceDetails> assertRingIs(IInstance instance, Collection<? extends IInstance> expectedInRing)
 +    {
 +        Set<String> expectedRingAddresses = expectedInRing.stream()
 +                                                         .map(i -> i.config().broadcastAddress().getAddress().getHostAddress())
 +                                                         .collect(Collectors.toSet());
 +        return assertRingIs(instance, expectedRingAddresses);
 +    }
 +
 +    /**
 +     * Make sure the ring is only the expected instances.  The source instance may not be in the ring, so this function
 +     * only relies on the expectedInsts param.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedRingAddresses expected instances addresses in the ring
 +     * @return the ring (if condition is true)
 +     */
 +    public static List<RingInstanceDetails> assertRingIs(IInstance instance, Set<String> expectedRingAddresses)
 +    {
 +        List<RingInstanceDetails> ring = ring(instance);
 +        Set<String> ringAddresses = ring.stream().map(d -> d.address).collect(Collectors.toSet());
 +        assertThat(ringAddresses)
 +        .as("Ring addreses did not match for instance %s", instance)
 +        .isEqualTo(expectedRingAddresses);
 +        return ring;
 +    }
 +
 +    private static boolean isRingInstanceDetailsHealthy(RingInstanceDetails details)
 +    {
 +        return details.status.equals("Up") && details.state.equals("Normal");
 +    }
 +
 +    private static List<RingInstanceDetails> parseRing(String str)
 +    {
 +        // 127.0.0.3  rack0       Up     Normal  46.21 KB        100.00%             -1
 +        // /127.0.0.1:7012  Unknown     ?      Normal  ?               100.00%             -3074457345618258603
 +        Pattern pattern = Pattern.compile("^(/?[0-9.:]+)\\s+(\\w+|\\?)\\s+(\\w+|\\?)\\s+(\\w+|\\?).*?(-?\\d+)\\s*$");
 +        List<RingInstanceDetails> details = new ArrayList<>();
 +        String[] lines = str.split("\n");
 +        for (String line : lines)
 +        {
 +            Matcher matcher = pattern.matcher(line);
 +            if (!matcher.find())
 +            {
 +                continue;
 +            }
 +            details.add(new RingInstanceDetails(matcher.group(1), matcher.group(2), matcher.group(3), matcher.group(4), matcher.group(5)));
 +        }
 +
 +        return details;
 +    }
 +
 +    private static Map<String, Map<String, String>> awaitGossip(IInstance src, String errorMessage, Predicate<Map<String, Map<String, String>>> fn)
 +    {
 +        Map<String, Map<String, String>> gossip = null;
 +        for (int i = 0; i < 100; i++)
 +        {
 +            gossip = gossipInfo(src);
 +            if (fn.test(gossip))
 +            {
 +                return gossip;
 +            }
 +            sleepUninterruptibly(1, TimeUnit.SECONDS);
 +        }
 +        throw new AssertionError(errorMessage + "\n" + gossip);
 +    }
 +
 +    /**
 +     * Wait for the target instance to have the desired status. Target status is checked via string contains so works
 +     * with 'NORMAL' but also can check tokens or full state.
 +     *
 +     * @param instance instance to check on
 +     * @param expectedInGossip instance to wait for
 +     * @param targetStatus for the instance
 +     * @return gossip info
 +     */
 +    public static Map<String, Map<String, String>> awaitGossipStatus(IInstance instance, IInstance expectedInGossip, String targetStatus)
 +    {
 +        return awaitGossip(instance, "Node " + expectedInGossip + " did not match state " + targetStatus, gossip -> {
 +            Map<String, String> state = gossip.get(getBroadcastAddressString(expectedInGossip));
 +            if (state == null)
 +                return false;
 +            String status = state.get("STATUS_WITH_PORT");
 +            if (status == null)
 +                status = state.get("STATUS");
 +            if (status == null)
 +                return targetStatus == null;
 +            return status.contains(targetStatus);
 +        });
 +    }
 +
 +    /**
 +     * Get the gossip information from the node.  Currently only address, generation, and heartbeat are returned
 +     *
 +     * @param inst to check on
 +     * @return gossip info
 +     */
 +    public static Map<String, Map<String, String>> gossipInfo(IInstance inst)
 +    {
 +        NodeToolResult results = inst.nodetoolResult("gossipinfo");
 +        results.asserts().success();
 +        return parseGossipInfo(results.getStdout());
 +    }
 +
 +    /**
 +     * Make sure the gossip info for the specific target has the expected generation and heartbeat
 +     *
 +     * @param instance to check on
 +     * @param expectedInGossip instance to check for
 +     * @param expectedGeneration expected generation
 +     * @param expectedHeartbeat expected heartbeat
 +     */
 +    public static void assertGossipInfo(IInstance instance,
 +                                        InetSocketAddress expectedInGossip, int expectedGeneration, int expectedHeartbeat)
 +    {
 +        String targetAddress = expectedInGossip.getAddress().toString();
 +        Map<String, Map<String, String>> gossipInfo = gossipInfo(instance);
 +        Map<String, String> gossipState = gossipInfo.get(targetAddress);
 +        if (gossipState == null)
 +            throw new NullPointerException("Unable to find gossip info for " + targetAddress + "; gossip info = " + gossipInfo);
 +        Assert.assertEquals(Long.toString(expectedGeneration), gossipState.get("generation"));
 +        Assert.assertEquals(Long.toString(expectedHeartbeat), gossipState.get("heartbeat")); //TODO do we really mix these two?
 +    }
 +
 +    private static Map<String, Map<String, String>> parseGossipInfo(String str)
 +    {
 +        Map<String, Map<String, String>> map = new HashMap<>();
 +        String[] lines = str.split("\n");
 +        String currentInstance = null;
 +        for (String line : lines)
 +        {
 +            if (line.startsWith("/"))
 +            {
 +                // start of new instance
 +                currentInstance = line;
 +                continue;
 +            }
 +            Objects.requireNonNull(currentInstance);
 +            String[] kv = line.trim().split(":", 2);
 +            assert kv.length == 2 : "When splitting line '" + line + "' expected 2 parts but not true";
 +            Map<String, String> state = map.computeIfAbsent(currentInstance, ignore -> new HashMap<>());
 +            state.put(kv[0], kv[1]);
 +        }
 +
 +        return map;
 +    }
 +
 +    /**
 +     * Get the tokens assigned to the instance via config.  This method does not work if the instance has learned
 +     * or generated its tokens.
 +     *
 +     * @param instance to get tokens from
 +     * @return non-empty list of tokens
 +     */
 +    public static List<String> getTokens(IInstance instance)
 +    {
 +        IInstanceConfig conf = instance.config();
 +        int numTokens = conf.getInt("num_tokens");
 +        Assert.assertEquals("Only single token is supported", 1, numTokens);
 +        String token = conf.getString("initial_token");
 +        Assert.assertNotNull("initial_token was not found", token);
 +        return Arrays.asList(token);
 +    }
 +
 +    /**
 +     * Get all data directories for the given instance.
 +     *
 +     * @param instance to get data directories for
 +     * @return data directories
 +     */
 +    public static List<File> getDataDirectories(IInstance instance)
 +    {
 +        IInstanceConfig conf = instance.config();
 +        // this isn't safe as it assumes the implementation of InstanceConfig
 +        // might need to get smarter... some day...
 +        String[] ds = (String[]) conf.get("data_file_directories");
 +        List<File> files = new ArrayList<>(ds.length);
 +        for (int i = 0; i < ds.length; i++)
 +            files.add(new File(ds[i]));
 +        return files;
 +    }
 +
 +    /**
 +     * Get the commit log directory for the given instance.
 +     *
 +     * @param instance to get the commit log directory for
 +     * @return commit log directory
 +     */
 +    public static File getCommitLogDirectory(IInstance instance)
 +    {
 +        IInstanceConfig conf = instance.config();
 +        // this isn't safe as it assumes the implementation of InstanceConfig
 +        // might need to get smarter... some day...
 +        String d = (String) conf.get("commitlog_directory");
 +        return new File(d);
 +    }
 +
 +    /**
 +     * Get the hints directory for the given instance.
 +     *
 +     * @param instance to get the hints directory for
 +     * @return hints directory
 +     */
 +    public static File getHintsDirectory(IInstance instance)
 +    {
 +        IInstanceConfig conf = instance.config();
 +        // this isn't safe as it assumes the implementation of InstanceConfig
 +        // might need to get smarter... some day...
 +        String d = (String) conf.get("hints_directory");
 +        return new File(d);
 +    }
 +
 +    /**
 +     * Get the saved caches directory for the given instance.
 +     *
 +     * @param instance to get the saved caches directory for
 +     * @return saved caches directory
 +     */
 +    public static File getSavedCachesDirectory(IInstance instance)
 +    {
 +        IInstanceConfig conf = instance.config();
 +        // this isn't safe as it assumes the implementation of InstanceConfig
 +        // might need to get smarter... some day...
 +        String d = (String) conf.get("saved_caches_directory");
 +        return new File(d);
 +    }
 +
 +    /**
 +     * Get all writable directories for the given instance.
 +     *
 +     * @param instance to get directories for
 +     * @return all writable directories
 +     */
 +    public static List<File> getDirectories(IInstance instance)
 +    {
 +        List<File> out = new ArrayList<>();
 +        out.addAll(getDataDirectories(instance));
 +        out.add(getCommitLogDirectory(instance));
 +        out.add(getHintsDirectory(instance));
 +        out.add(getSavedCachesDirectory(instance));
 +        return out;
 +    }
 +
 +    /**
 +     * Gets the name of the Partitioner for the given instance.
 +     *
 +     * @param instance to get partitioner from
 +     * @return partitioner name
 +     */
 +    public static String getPartitionerName(IInstance instance)
 +    {
 +        return (String) instance.config().get("partitioner");
 +    }
 +
 +    /**
 +     * Changes the instance's address to the new address.  This method should only be called while the instance is
 +     * down, else has undefined behavior.
 +     *
 +     * @param instance to update address for
 +     * @param address to set
 +     */
 +    public static void updateAddress(IInstance instance, String address)
 +    {
 +        updateAddress(instance.config(), address);
 +    }
 +
 +    /**
 +     * Changes the instance's address to the new address.  This method should only be called while the instance is
 +     * down, else has undefined behavior.
 +     *
 +     * @param conf to update address for
 +     * @param address to set
 +     */
 +    private static void updateAddress(IInstanceConfig conf, String address)
 +    {
 +        for (String key : Arrays.asList("broadcast_address", "listen_address", "broadcast_rpc_address", "rpc_address"))
 +            conf.set(key, address);
 +
 +        // InstanceConfig caches InetSocketAddress -> InetAddressAndPort
 +        // this causes issues as startup now ignores config, so force reset it to pull from conf.
 +        ((InstanceConfig) conf).unsetBroadcastAddressAndPort(); //TODO remove the need to null out the cache...
 +        conf.networkTopology().put(conf.broadcastAddress(), NetworkTopology.dcAndRack(conf.localDatacenter(), conf.localRack()));
 +    }
 +
 +    /**
 +     * Get the broadcast address host address only (ex. 127.0.0.1)
 +     */
 +    private static String getBroadcastAddressHostString(IInstance target)
 +    {
 +        return target.config().broadcastAddress().getAddress().getHostAddress();
 +    }
 +
 +    /**
 +     * Get the broadcast address in host:port format (ex. 127.0.0.1:7190)
 +     */
 +    public static String getBroadcastAddressHostWithPortString(IInstance target)
 +    {
 +        InetSocketAddress address = target.config().broadcastAddress();
 +        return address.getAddress().getHostAddress() + ":" + address.getPort();
 +    }
 +
 +    /**
 +     * Get the broadcast address InetAddess string (ex. localhost/127.0.0.1 or /127.0.0.1)
 +     */
 +    private static String getBroadcastAddressString(IInstance target)
 +    {
 +        return target.config().broadcastAddress().getAddress().toString();
 +    }
 +
 +    public static final class RingInstanceDetails
 +    {
 +        private final String address;
 +        private final String rack;
 +        private final String status;
 +        private final String state;
 +        private final String token;
 +
 +        private RingInstanceDetails(String address, String rack, String status, String state, String token)
 +        {
 +            this.address = address;
 +            this.rack = rack;
 +            this.status = status;
 +            this.state = state;
 +            this.token = token;
 +        }
 +
 +        public String getAddress()
 +        {
 +            return address;
 +        }
 +
 +        public String getRack()
 +        {
 +            return rack;
 +        }
 +
 +        public String getStatus()
 +        {
 +            return status;
 +        }
 +
 +        public String getState()
 +        {
 +            return state;
 +        }
 +
 +        public String getToken()
 +        {
 +            return token;
 +        }
 +
 +        @Override
 +        public boolean equals(Object o)
 +        {
 +            if (this == o) return true;
 +            if (o == null || getClass() != o.getClass()) return false;
 +            RingInstanceDetails that = (RingInstanceDetails) o;
 +            return Objects.equals(address, that.address) &&
 +                   Objects.equals(rack, that.rack) &&
 +                   Objects.equals(status, that.status) &&
 +                   Objects.equals(state, that.state) &&
 +                   Objects.equals(token, that.token);
 +        }
 +
 +        @Override
 +        public int hashCode()
 +        {
 +            return Objects.hash(address, rack, status, state, token);
 +        }
 +
 +        public String toString()
 +        {
 +            return Arrays.asList(address, rack, status, state, token).toString();
 +        }
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index 7ebfd0c,ba6027b..1eb7f04
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@@ -19,13 -19,11 +19,9 @@@
  package org.apache.cassandra.distributed.test;
  
  import java.io.Closeable;
 -import java.net.InetAddress;
 -import java.util.ArrayList;
 +import java.net.InetSocketAddress;
  import java.util.Collection;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
 -import java.util.List;
+ import java.util.concurrent.*;
  import java.util.concurrent.locks.LockSupport;
  import java.util.stream.Collectors;
  
@@@ -42,15 -42,17 +40,21 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.gms.ApplicationState;
  import org.apache.cassandra.gms.EndpointState;
  import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.service.PendingRangeCalculatorService;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.StreamPlan;
+ import org.apache.cassandra.streaming.StreamResultFuture;
  import org.apache.cassandra.utils.FBUtilities;
  
  import static net.bytebuddy.matcher.ElementMatchers.named;
  import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
  import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
  import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
++import static org.apache.cassandra.distributed.shared.ClusterUtils.getLocalToken;
++import static org.apache.cassandra.distributed.shared.ClusterUtils.runAndWaitForLogs;
+ import static org.junit.Assert.assertEquals;
  
  public class GossipTest extends TestBaseImpl
  {
@@@ -232,4 -228,128 +236,102 @@@
          }
      }
  
+     @Test
+     public void gossipShutdownUpdatesTokenMetadata() throws Exception
+     {
+         try (Cluster cluster = Cluster.build(3)
+                                       .withConfig(c -> c.with(Feature.GOSSIP, Feature.NETWORK))
+                                       .withInstanceInitializer(FailureHelper::installMoveFailure)
+                                       .start())
+         {
+             init(cluster, 2);
+             populate(cluster);
+             IInvokableInstance node1 = cluster.get(1);
+             IInvokableInstance node2 = cluster.get(2);
+             IInvokableInstance node3 = cluster.get(3);
+ 
+             // initiate a move for node2, which will not complete due to the
+             // ByteBuddy interceptor we injected. Wait for the other two nodes
+             // to mark node2 as moving before proceeding.
+             long t2 = Long.parseLong(getLocalToken(node2));
+             long t3 = Long.parseLong(getLocalToken(node3));
+             long moveTo = t2 + ((t3 - t2)/2);
 -            String logMsg = "Node " + node2.broadcastAddress().getAddress() + " state moving, new token " + moveTo;
++            String logMsg = "Node " + node2.broadcastAddress() + " state moving, new token " + moveTo;
+             runAndWaitForLogs(() -> node2.nodetoolResult("move", "--", Long.toString(moveTo)).asserts().failure(),
+                               logMsg,
+                               cluster);
+ 
 -            InetAddress movingAddress = node2.broadcastAddress().getAddress();
++            InetSocketAddress movingAddress = node2.broadcastAddress();
+             // node1 & node3 should now consider some ranges pending for node2
+             assertPendingRangesForPeer(true, movingAddress, cluster);
+ 
+             // A controlled shutdown causes peers to replace the MOVING status to be with SHUTDOWN, but prior to
+             // CASSANDRA-16796 this doesn't update TokenMetadata, so they maintain pending ranges for the down node
+             // indefinitely, even after it has been removed from the ring.
 -            logMsg = "Marked " + node2.broadcastAddress().getAddress() + " as shutdown";
++            logMsg = "Marked " + node2.broadcastAddress() + " as shutdown";
+             runAndWaitForLogs(() -> Futures.getUnchecked(node2.shutdown()),
+                               logMsg,
+                               node1, node3);
+             // node1 & node3 should not consider any ranges as still pending for node2
+             assertPendingRangesForPeer(false, movingAddress, cluster);
+         }
+     }
+ 
 -    void assertPendingRangesForPeer(final boolean expectPending, final InetAddress movingAddress, final Cluster cluster)
++    void assertPendingRangesForPeer(final boolean expectPending, final InetSocketAddress movingAddress, final Cluster cluster)
+     {
+         for (IInvokableInstance inst : new IInvokableInstance[]{ cluster.get(1), cluster.get(3)})
+         {
 -            boolean hasPending = inst.appliesOnInstance((InetAddress peer) -> {
++            boolean hasPending = inst.appliesOnInstance((InetSocketAddress address) -> {
++                InetAddressAndPort peer = toCassandraInetAddressAndPort(address);
+ 
+                 PendingRangeCalculatorService.instance.blockUntilFinished();
+ 
+                 boolean isMoving = StorageService.instance.getTokenMetadata()
+                                                           .getMovingEndpoints()
+                                                           .stream()
+                                                           .map(pair -> pair.right)
+                                                           .anyMatch(peer::equals);
+ 
+                 return isMoving && !StorageService.instance.getTokenMetadata()
+                                                            .getPendingRanges(KEYSPACE, peer)
+                                                            .isEmpty();
+             }).apply(movingAddress);
+             assertEquals(String.format("%s should %shave PENDING RANGES for %s",
+                                        inst.broadcastAddress().getHostString(),
+                                        expectPending ? "" : "not ",
+                                        movingAddress),
+                          hasPending, expectPending);
+         }
+     }
+ 
 -    private String getLocalToken(IInvokableInstance inst)
 -    {
 -        return inst.callOnInstance(() -> {
 -            List<String> tokens = new ArrayList<>();
 -            for (Token t : StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()))
 -                tokens.add(t.getTokenValue().toString());
 -
 -            assert tokens.size() == 1 : "getLocalToken assumes a single token, but multiple tokens found";
 -            return tokens.get(0);
 -        });
 -    }
 -
 -    public static void runAndWaitForLogs(Runnable r, String waitString, Cluster cluster) throws TimeoutException
 -    {
 -        runAndWaitForLogs(r, waitString, cluster.stream().toArray(IInstance[]::new));
 -    }
 -
 -    public static void runAndWaitForLogs(Runnable r, String waitString, IInstance...instances) throws TimeoutException
 -    {
 -        long [] marks = new long[instances.length];
 -        for (int i = 0; i < instances.length; i++)
 -            marks[i] = instances[i].logs().mark();
 -        r.run();
 -        for (int i = 0; i < instances.length; i++)
 -            instances[i].logs().watchFor(marks[i], waitString);
 -    }
 -
+     static void populate(Cluster cluster)
+     {
+         cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int PRIMARY KEY)");
+         for (int i = 0; i < 10; i++)
+         {
+             cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)",
+                                            ConsistencyLevel.ALL,
+                                            i);
+         }
+     }
+ 
+     public static class FailureHelper
+     {
+         static void installMoveFailure(ClassLoader cl, int nodeNumber)
+         {
+             if (nodeNumber == 2)
+             {
+                 new ByteBuddy().redefine(StreamPlan.class)
+                                .method(named("execute"))
+                                .intercept(MethodDelegation.to(FailureHelper.class))
+                                .make()
+                                .load(cl, ClassLoadingStrategy.Default.INJECTION);
+             }
+         }
+ 
+         public static StreamResultFuture execute()
+         {
+             throw new RuntimeException("failing to execute move");
+         }
+     }
  }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org