You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2019/06/03 15:05:03 UTC

[cassandra] branch cassandra-3.0 updated: Update token metadata for non-normal state changes

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new e4b5d98  Update token metadata for non-normal state changes
e4b5d98 is described below

commit e4b5d9818f003be2b9091c48f8435d29202ffe2d
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Thu May 2 17:24:43 2019 +0100

    Update token metadata for non-normal state changes
    
    Patch by Benedict Elliot Smith; reviewed by Sam Tunnicliffe for CASSANDRA-15120
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/concurrent/SEPExecutor.java   |  10 +-
 .../org/apache/cassandra/concurrent/SEPWorker.java |  33 ++---
 .../cassandra/concurrent/SharedExecutorPool.java   |   8 +-
 .../apache/cassandra/concurrent/StageManager.java  |   2 +-
 .../org/apache/cassandra/net/MessagingService.java |  10 ++
 .../apache/cassandra/service/StorageService.java   | 163 ++++++++++++---------
 .../org/apache/cassandra/utils/ExpiringMap.java    |   5 +
 .../org/apache/cassandra/distributed/Cluster.java  |  18 ++-
 .../cassandra/distributed/UpgradeableCluster.java  |  12 +-
 .../apache/cassandra/distributed/api/Feature.java  |  24 +++
 .../cassandra/distributed/api/IInstance.java       |   5 +-
 .../distributed/impl/AbstractCluster.java          |  58 ++++++--
 .../impl/DelegatingInvokableInstance.java          |   6 +-
 .../cassandra/distributed/impl/Instance.java       |  46 ++++--
 .../distributed/impl/InstanceClassLoader.java      |   2 +-
 .../distributed/test/DistributedTestBase.java      |   1 +
 .../cassandra/distributed/test/GossipTest.java     | 113 ++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |   2 +-
 .../org/apache/cassandra/service/MoveTest.java     |  11 +-
 20 files changed, 393 insertions(+), 137 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6f6bd70..36eb9c2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.19
+ * Update token metadata when handling MOVING/REMOVING_TOKEN events (CASSANDRA-15120)
  * Add ability to customize cassandra log directory using $CASSANDRA_LOG_DIR (CASSANDRA-15090)
  * Skip cells with illegal column names when reading legacy sstables (CASSANDRA-15086)
  * Fix assorted gossip races and add related runtime checks (CASSANDRA-15059)
diff --git a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
index 8b12b82..d5c7b14 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPExecutor.java
@@ -174,7 +174,11 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService
             long current = permits.get();
             int workPermits = workPermits(current);
             if (permits.compareAndSet(current, updateWorkPermits(current, workPermits + 1)))
-                return;
+            {
+                if (shuttingDown && workPermits + 1 == maxWorkers)
+                    shutdown.signalAll();
+                break;
+            }
         }
     }
 
@@ -206,7 +210,7 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService
     {
         shuttingDown = true;
         pool.executors.remove(this);
-        if (getActiveCount() == 0)
+        if (getActiveCount() == 0 && getPendingTasks() == 0)
             shutdown.signalAll();
 
         // release metrics
@@ -219,6 +223,8 @@ public class SEPExecutor extends AbstractLocalAwareExecutorService
         List<Runnable> aborted = new ArrayList<>();
         while (takeTaskPermit())
             aborted.add(tasks.poll());
+        if (getActiveCount() == 0)
+            shutdown.signalAll();
         return aborted;
     }
 
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index edc31da..f7eb47a 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -98,7 +98,6 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
                 // if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING
                 // (which is also a state that will never be interrupted externally)
                 set(Work.WORKING);
-                boolean shutdown;
                 while (true)
                 {
                     // before we process any task, we maybe schedule a new worker _to our executor only_; this
@@ -111,19 +110,13 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
                     task = null;
 
                     // if we're shutting down, or we fail to take a permit, we don't perform any more work
-                    if ((shutdown = assigned.shuttingDown) || !assigned.takeTaskPermit())
+                    if (!assigned.takeTaskPermit())
                         break;
                     task = assigned.tasks.poll();
                 }
 
                 // return our work permit, and maybe signal shutdown
                 assigned.returnWorkPermit();
-                if (shutdown)
-                {
-                    if (assigned.getActiveCount() == 0)
-                        assigned.shutdown.signalAll();
-                    return;
-                }
                 assigned = null;
 
                 // try to immediately reassign ourselves some work; if we fail, start spinning
@@ -134,22 +127,24 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
         catch (Throwable t)
         {
             JVMStabilityInspector.inspectThrowable(t);
-            while (true)
+            if (task != null)
+                logger.error("Failed to execute task, unexpected exception killed worker: {}", t);
+            else
+                logger.error("Unexpected exception killed worker: {}", t);
+        }
+        finally
+        {
+            if (assigned != null)
+                assigned.returnWorkPermit();
+
+            do
             {
                 if (get().assigned != null)
                 {
-                    assigned = get().assigned;
+                    get().assigned.returnWorkPermit();
                     set(Work.WORKING);
                 }
-                if (assign(Work.STOPPED, true))
-                    break;
-            }
-            if (assigned != null)
-                assigned.returnWorkPermit();
-            if (task != null)
-                logger.error("Failed to execute task, unexpected exception killed worker: {}", t);
-            else
-                logger.error("Unexpected exception killed worker: {}", t);
+            } while (!assign(Work.STOPPED, true));
         }
     }
 
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index d355d77..3997c1a 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.concurrent;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -107,16 +108,17 @@ public class SharedExecutorPool
             schedule(Work.SPINNING);
     }
 
-    public LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
+    public synchronized LocalAwareExecutorService newExecutor(int maxConcurrency, int maxQueuedTasks, String jmxPath, String name)
     {
         SEPExecutor executor = new SEPExecutor(this, maxConcurrency, maxQueuedTasks, jmxPath, name);
         executors.add(executor);
         return executor;
     }
 
-    public void shutdown() throws InterruptedException
+    public synchronized void shutdownAndWait() throws InterruptedException
     {
         shuttingDown = true;
+        List<SEPExecutor> executors = new ArrayList<>(this.executors);
         for (SEPExecutor executor : executors)
             executor.shutdownNow();
 
@@ -127,7 +129,7 @@ public class SharedExecutorPool
             executor.shutdown.await(until - System.nanoTime(), TimeUnit.NANOSECONDS);
     }
 
-    void terminateWorkers()
+    private void terminateWorkers()
     {
         assert shuttingDown;
 
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 8603778..2f90a29 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -125,7 +125,7 @@ public class StageManager
     public static void shutdownAndWait() throws InterruptedException
     {
         for (Stage stage : Stage.values())
-            StageManager.stages.get(stage).shutdown();
+            StageManager.stages.get(stage).shutdownNow();
         for (Stage stage : Stage.values())
             StageManager.stages.get(stage).awaitTermination(60, TimeUnit.SECONDS);
     }
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index a76df0d..82b26ea 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -816,11 +816,18 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public void shutdown()
     {
+        shutdown(true);
+    }
+    public void shutdown(boolean gracefully)
+    {
         logger.info("Waiting for messaging service to quiesce");
         // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
         // the important part
+        if (!gracefully)
+            callbacks.reset();
+
         if (!callbacks.shutdownBlocking())
             logger.warn("Failed to wait for messaging service callbacks shutdown");
 
@@ -829,6 +836,7 @@ public final class MessagingService implements MessagingServiceMBean
         {
             clearMessageSinks();
             for (SocketThread th : socketThreads)
+            {
                 try
                 {
                     th.close();
@@ -838,6 +846,8 @@ public final class MessagingService implements MessagingServiceMBean
                     // see https://issues.apache.org/jira/browse/CASSANDRA-10545
                     handleIOExceptionOnClose(e);
                 }
+            }
+            connectionManagers.values().forEach(OutboundTcpConnectionPool::close);
         }
         catch (IOException e)
         {
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a1f361d..4769b22 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -718,7 +718,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return DatabaseDescriptor.isAutoBootstrap() && !SystemKeyspace.bootstrapComplete() && !DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress());
     }
 
-    private void prepareToJoin() throws ConfigurationException
+    @VisibleForTesting
+    public void prepareToJoin() throws ConfigurationException
     {
         if (!joined)
         {
@@ -787,7 +788,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
     }
 
-    private void joinTokenRing(int delay) throws ConfigurationException
+    @VisibleForTesting
+    public void joinTokenRing(int delay) throws ConfigurationException
     {
         joined = true;
 
@@ -2049,6 +2051,85 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         tokenMetadata.updateHostId(Gossiper.instance.getHostId(newNode), newNode);
     }
 
+    private void ensureUpToDateTokenMetadata(String status, InetAddress endpoint)
+    {
+        Set<Token> tokens = new TreeSet<>(getTokensFor(endpoint));
+
+        if (logger.isDebugEnabled())
+            logger.debug("Node {} state {}, tokens {}", endpoint, status, tokens);
+
+        // If the node is previously unknown or tokens do not match, update tokenmetadata to
+        // have this node as 'normal' (it must have been using this token before the
+        // leave). This way we'll get pending ranges right.
+        if (!tokenMetadata.isMember(endpoint))
+        {
+            logger.info("Node {} state jump to {}", endpoint, status);
+            updateTokenMetadata(endpoint, tokens);
+        }
+        else if (!tokens.equals(new TreeSet<>(tokenMetadata.getTokens(endpoint))))
+        {
+            logger.warn("Node {} '{}' token mismatch. Long network partition?", endpoint, status);
+            updateTokenMetadata(endpoint, tokens);
+        }
+    }
+
+    private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens)
+    {
+        updateTokenMetadata(endpoint, tokens, new HashSet<>());
+    }
+
+    private void updateTokenMetadata(InetAddress endpoint, Iterable<Token> tokens, Set<InetAddress> endpointsToRemove)
+    {
+        Set<Token> tokensToUpdateInMetadata = new HashSet<>();
+        Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
+
+        for (final Token token : tokens)
+        {
+            // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
+            InetAddress currentOwner = tokenMetadata.getEndpoint(token);
+            if (currentOwner == null)
+            {
+                logger.debug("New node {} at token {}", endpoint, token);
+                tokensToUpdateInMetadata.add(token);
+                tokensToUpdateInSystemKeyspace.add(token);
+            }
+            else if (endpoint.equals(currentOwner))
+            {
+                // set state back to normal, since the node may have tried to leave, but failed and is now back up
+                tokensToUpdateInMetadata.add(token);
+                tokensToUpdateInSystemKeyspace.add(token);
+            }
+            else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
+            {
+                tokensToUpdateInMetadata.add(token);
+                tokensToUpdateInSystemKeyspace.add(token);
+
+                // currentOwner is no longer current, endpoint is.  Keep track of these moves, because when
+                // a host no longer has any tokens, we'll want to remove it.
+                Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+                epToTokenCopy.get(currentOwner).remove(token);
+                if (epToTokenCopy.get(currentOwner).isEmpty())
+                    endpointsToRemove.add(currentOwner);
+
+                logger.info("Nodes {} and {} have the same token {}. {} is the new owner", endpoint, currentOwner, token, endpoint);
+            }
+            else
+            {
+                logger.info("Nodes () and {} have the same token {}.  Ignoring {}", endpoint, currentOwner, token, endpoint);
+            }
+        }
+
+        tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
+        for (InetAddress ep : endpointsToRemove)
+        {
+            removeEndpoint(ep);
+            if (replacing && ep.equals(DatabaseDescriptor.getReplaceAddress()))
+                Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
+        }
+        if (!tokensToUpdateInSystemKeyspace.isEmpty())
+            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
+    }
+
     /**
      * Handle node move to normal state. That is, node is entering token ring and participating
      * in reads.
@@ -2058,8 +2139,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     private void handleStateNormal(final InetAddress endpoint, final String status)
     {
         Collection<Token> tokens = getTokensFor(endpoint);
-        Set<Token> tokensToUpdateInMetadata = new HashSet<>();
-        Set<Token> tokensToUpdateInSystemKeyspace = new HashSet<>();
         Set<InetAddress> endpointsToRemove = new HashSet<>();
 
         if (logger.isDebugEnabled())
@@ -2127,62 +2206,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 tokenMetadata.updateHostId(hostId, endpoint);
         }
 
-        for (final Token token : tokens)
-        {
-            // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
-            InetAddress currentOwner = tokenMetadata.getEndpoint(token);
-            if (currentOwner == null)
-            {
-                logger.debug("New node {} at token {}", endpoint, token);
-                tokensToUpdateInMetadata.add(token);
-                tokensToUpdateInSystemKeyspace.add(token);
-            }
-            else if (endpoint.equals(currentOwner))
-            {
-                // set state back to normal, since the node may have tried to leave, but failed and is now back up
-                tokensToUpdateInMetadata.add(token);
-                tokensToUpdateInSystemKeyspace.add(token);
-            }
-            else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
-            {
-                tokensToUpdateInMetadata.add(token);
-                tokensToUpdateInSystemKeyspace.add(token);
-
-                // currentOwner is no longer current, endpoint is.  Keep track of these moves, because when
-                // a host no longer has any tokens, we'll want to remove it.
-                Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
-                epToTokenCopy.get(currentOwner).remove(token);
-                if (epToTokenCopy.get(currentOwner).size() < 1)
-                    endpointsToRemove.add(currentOwner);
-
-                logger.info(String.format("Nodes %s and %s have the same token %s.  %s is the new owner",
-                                          endpoint,
-                                          currentOwner,
-                                          token,
-                                          endpoint));
-            }
-            else
-            {
-                logger.info(String.format("Nodes %s and %s have the same token %s.  Ignoring %s",
-                                           endpoint,
-                                           currentOwner,
-                                           token,
-                                           endpoint));
-            }
-        }
-
         // capture because updateNormalTokens clears moving and member status
         boolean isMember = tokenMetadata.isMember(endpoint);
         boolean isMoving = tokenMetadata.isMoving(endpoint);
-        tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
-        for (InetAddress ep : endpointsToRemove)
-        {
-            removeEndpoint(ep);
-            if (replacing && DatabaseDescriptor.getReplaceAddress().equals(ep))
-                Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260
-        }
-        if (!tokensToUpdateInSystemKeyspace.isEmpty())
-            SystemKeyspace.updateTokens(endpoint, tokensToUpdateInSystemKeyspace, StageManager.getStage(Stage.MUTATION));
+
+        updateTokenMetadata(endpoint, tokens, endpointsToRemove);
 
         if (isMoving || operationMode == Mode.MOVING)
         {
@@ -2204,24 +2232,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     private void handleStateLeaving(InetAddress endpoint)
     {
-        Collection<Token> tokens = getTokensFor(endpoint);
-
-        if (logger.isDebugEnabled())
-            logger.debug("Node {} state leaving, tokens {}", endpoint, tokens);
-
         // If the node is previously unknown or tokens do not match, update tokenmetadata to
         // have this node as 'normal' (it must have been using this token before the
         // leave). This way we'll get pending ranges right.
-        if (!tokenMetadata.isMember(endpoint))
-        {
-            logger.info("Node {} state jump to leaving", endpoint);
-            tokenMetadata.updateNormalTokens(tokens, endpoint);
-        }
-        else if (!tokenMetadata.getTokens(endpoint).containsAll(tokens))
-        {
-            logger.warn("Node {} 'leaving' token mismatch. Long network partition?", endpoint);
-            tokenMetadata.updateNormalTokens(tokens, endpoint);
-        }
+
+        ensureUpToDateTokenMetadata(VersionedValue.STATUS_LEAVING, endpoint);
 
         // at this point the endpoint is certainly a member with this token, so let's proceed
         // normally
@@ -2254,6 +2269,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
      */
     private void handleStateMoving(InetAddress endpoint, String[] pieces)
     {
+        ensureUpToDateTokenMetadata(VersionedValue.STATUS_MOVING, endpoint);
+
         assert pieces.length >= 2;
         Token token = getTokenFactory().fromString(pieces[1]);
 
@@ -2299,6 +2316,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             }
             else if (VersionedValue.REMOVING_TOKEN.equals(state))
             {
+                ensureUpToDateTokenMetadata(state, endpoint);
+
                 if (logger.isDebugEnabled())
                     logger.debug("Tokens {} removed manually (endpoint was {})", removeTokens, endpoint);
 
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index ef013f5..a6895c5 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -121,6 +121,11 @@ public class ExpiringMap<K, V>
     public void reset()
     {
         shutdown = false;
+        clear();
+    }
+
+    public void clear()
+    {
         cache.clear();
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/Cluster.java b/test/distributed/org/apache/cassandra/distributed/Cluster.java
index c7f7675..4ae4e5d 100644
--- a/test/distributed/org/apache/cassandra/distributed/Cluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/Cluster.java
@@ -19,10 +19,10 @@
 package org.apache.cassandra.distributed;
 
 import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
 import org.apache.cassandra.distributed.impl.IInvokableInstance;
@@ -35,9 +35,9 @@ import org.apache.cassandra.distributed.impl.Versions;
  */
 public class Cluster extends AbstractCluster<IInvokableInstance> implements ICluster, AutoCloseable
 {
-    private Cluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+    private Cluster(File root, Versions.Version version, List<InstanceConfig> configs, Set<Feature> features, ClassLoader sharedClassLoader)
     {
-        super(root, version, configs, sharedClassLoader);
+        super(root, version, configs, features, sharedClassLoader);
     }
 
     protected IInvokableInstance newInstanceWrapper(Versions.Version version, InstanceConfig config)
@@ -49,9 +49,17 @@ public class Cluster extends AbstractCluster<IInvokableInstance> implements IClu
     {
         return create(nodeCount, Cluster::new);
     }
+    public static Cluster create(int nodeCount, Set<Feature> with) throws Throwable
+    {
+        return create(nodeCount, with, Cluster::new);
+    }
     public static Cluster create(int nodeCount, File root)
     {
-        return create(nodeCount, Versions.CURRENT, root, Cluster::new);
+        return create(nodeCount, root, Cluster::new);
+    }
+    public static Cluster create(int nodeCount, File root, Set<Feature> with)
+    {
+        return create(nodeCount, Versions.CURRENT, root, with, Cluster::new);
     }
 }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index 0c8e63a..d0613b1 100644
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@ -22,7 +22,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.impl.AbstractCluster;
 import org.apache.cassandra.distributed.impl.IUpgradeableInstance;
@@ -38,9 +40,9 @@ import org.apache.cassandra.distributed.impl.Versions;
  */
 public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> implements ICluster, AutoCloseable
 {
-    private UpgradeableCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+    private UpgradeableCluster(File root, Versions.Version version, List<InstanceConfig> configs, Set<Feature> features, ClassLoader sharedClassLoader)
     {
-        super(root, version, configs, sharedClassLoader);
+        super(root, version, configs, features, sharedClassLoader);
     }
 
     protected IUpgradeableInstance newInstanceWrapper(Versions.Version version, InstanceConfig config)
@@ -52,15 +54,17 @@ public class UpgradeableCluster extends AbstractCluster<IUpgradeableInstance> im
     {
         return create(nodeCount, UpgradeableCluster::new);
     }
+
     public static UpgradeableCluster create(int nodeCount, File root)
     {
-        return create(nodeCount, Versions.CURRENT, root, UpgradeableCluster::new);
+        return create(nodeCount, root, UpgradeableCluster::new);
     }
 
     public static UpgradeableCluster create(int nodeCount, Versions.Version version) throws IOException
     {
-        return create(nodeCount, version, Files.createTempDirectory("dtests").toFile(), UpgradeableCluster::new);
+        return create(nodeCount, version, UpgradeableCluster::new);
     }
+
     public static UpgradeableCluster create(int nodeCount, Versions.Version version, File root)
     {
         return create(nodeCount, version, root, UpgradeableCluster::new);
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Feature.java b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
new file mode 100644
index 0000000..a5c9316
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/api/Feature.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.api;
+
+public enum Feature
+{
+    NETWORK, GOSSIP
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 3834093..25e2c94 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.api;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
 
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Future;
 
@@ -37,10 +38,12 @@ public interface IInstance extends IIsolatedExecutor
     UUID schemaVersion();
 
     void startup();
+    boolean isShutdown();
     Future<Void> shutdown();
+    Future<Void> shutdown(boolean graceful);
 
     // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
-    void startup(ICluster cluster);
+    void startup(ICluster cluster, Set<Feature> with);
     void receiveMessage(IMessage message);
 
     int getMessagingVersion();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index c27d9bf..67c844f 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -30,9 +30,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -47,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -94,6 +92,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
 
     private final File root;
     private final ClassLoader sharedClassLoader;
+    private final Set<Feature> features;
 
     // mutated by starting/stopping a node
     private final List<I> instances;
@@ -136,23 +135,28 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             return config;
         }
 
+        public boolean isShutdown()
+        {
+            return isShutdown;
+        }
+
         @Override
         public synchronized void startup()
         {
             if (!isShutdown)
                 throw new IllegalStateException();
-            delegate().startup(AbstractCluster.this);
+            delegate().startup(AbstractCluster.this, features);
             isShutdown = false;
             updateMessagingVersions();
         }
 
         @Override
-        public synchronized Future<Void> shutdown()
+        public synchronized Future<Void> shutdown(boolean graceful)
         {
             if (isShutdown)
                 throw new IllegalStateException();
             isShutdown = true;
-            Future<Void> future = delegate.shutdown();
+            Future<Void> future = delegate.shutdown(graceful);
             delegate = null;
             return future;
         }
@@ -181,9 +185,10 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
-    protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader)
+    protected AbstractCluster(File root, Versions.Version version, List<InstanceConfig> configs, Set<Feature> features, ClassLoader sharedClassLoader)
     {
         this.root = root;
+        this.features = features;
         this.sharedClassLoader = sharedClassLoader;
         this.instances = new ArrayList<>();
         this.instanceMap = new HashMap<>();
@@ -325,37 +330,60 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         get(instance).schemaChangeInternal(statement);
     }
 
-    void startup()
+    public void startup()
     {
-        parallelForEach(I::startup, 0, null);
+        forEach(I::startup);
     }
 
     protected interface Factory<I extends IInstance, C extends AbstractCluster<I>>
     {
-        C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, ClassLoader sharedClassLoader);
+        C newCluster(File root, Versions.Version version, List<InstanceConfig> configs, Set<Feature> features, ClassLoader sharedClassLoader);
     }
 
     protected static <I extends IInstance, C extends AbstractCluster<I>> C
     create(int nodeCount, Factory<I, C> factory) throws Throwable
     {
-        return create(nodeCount, Files.createTempDirectory("dtests").toFile(), factory);
+        return create(nodeCount, Collections.emptySet(), factory);
+    }
+
+    protected static <I extends IInstance, C extends AbstractCluster<I>> C
+    create(int nodeCount, Set<Feature> features, Factory<I, C> factory) throws Throwable
+    {
+        return create(nodeCount, Files.createTempDirectory("dtests").toFile(), features, factory);
     }
 
     protected static <I extends IInstance, C extends AbstractCluster<I>> C
     create(int nodeCount, File root, Factory<I, C> factory)
     {
-        return create(nodeCount, Versions.CURRENT, root, factory);
+        return create(nodeCount, root, Collections.emptySet(), factory);
+    }
+
+    protected static <I extends IInstance, C extends AbstractCluster<I>> C
+    create(int nodeCount, File root, Set<Feature> features, Factory<I, C> factory)
+    {
+        return create(nodeCount, Versions.CURRENT, root, features, factory);
     }
 
     protected static <I extends IInstance, C extends AbstractCluster<I>> C
     create(int nodeCount, Versions.Version version, Factory<I, C> factory) throws IOException
     {
-        return create(nodeCount, version, Files.createTempDirectory("dtests").toFile(), factory);
+        return create(nodeCount, version, Collections.emptySet(), factory);
+    }
+
+    protected static <I extends IInstance, C extends AbstractCluster<I>> C
+    create(int nodeCount, Versions.Version version, Set<Feature> features, Factory<I, C> factory) throws IOException
+    {
+        return create(nodeCount, version, Files.createTempDirectory("dtests").toFile(), features, factory);
     }
 
     protected static <I extends IInstance, C extends AbstractCluster<I>> C
     create(int nodeCount, Versions.Version version, File root, Factory<I, C> factory)
     {
+        return create(nodeCount, version, root, Collections.emptySet(), factory);
+    }
+    protected static <I extends IInstance, C extends AbstractCluster<I>> C
+    create(int nodeCount, Versions.Version version, File root, Set<Feature> features, Factory<I, C> factory)
+    {
         root.mkdirs();
         setupLogging(root);
 
@@ -370,8 +398,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
             token += increment;
         }
 
-        C cluster = factory.newCluster(root, version, configs, sharedClassLoader);
-        cluster.startup();
+        C cluster = factory.newCluster(root, version, configs, features, sharedClassLoader);
         return cluster;
     }
 
@@ -398,6 +425,7 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
     public void close()
     {
         FBUtilities.waitOnFutures(instances.stream()
+                                           .filter(i -> !i.isShutdown())
                                            .map(IInstance::shutdown)
                                            .collect(Collectors.toList()),
                                   1L, TimeUnit.MINUTES);
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index e9e6844..94df6cd 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.impl;
 
 import java.io.Serializable;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
@@ -26,6 +27,7 @@ import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -110,9 +112,9 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance
     }
 
     @Override
-    public void startup(ICluster cluster)
+    public void startup(ICluster cluster, Set<Feature> with)
     {
-        delegate().startup(cluster);
+        delegate().startup(cluster, with);
     }
 
     @Override
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 94dbc96..1b385fb 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -36,7 +37,6 @@ import java.util.function.Function;
 import org.slf4j.LoggerFactory;
 
 import ch.qos.logback.classic.LoggerContext;
-import com.codahale.metrics.MetricFilter;
 import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.SharedExecutorPool;
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -98,6 +99,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         this.config = config;
         InstanceIDDefiner.setInstanceId(config.num());
         FBUtilities.setBroadcastInetAddress(config.broadcastAddressAndPort().address);
+        acceptsOnInstance((IInstanceConfig override) -> {
+            Config.setOverrideLoadConfig(() -> loadConfig(override));
+            DatabaseDescriptor.setDaemonInitialized();
+        }).accept(config);
     }
 
     public IInstanceConfig config()
@@ -145,6 +150,11 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
         throw new UnsupportedOperationException();
     }
 
+    public boolean isShutdown()
+    {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void schemaChangeInternal(String query)
     {
@@ -249,18 +259,15 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
     }
 
     @Override
-    public void startup(ICluster cluster)
+    public void startup(ICluster cluster, Set<Feature> with)
     {
         sync(() -> {
             try
             {
                 mkdirs();
-
-                Config.setOverrideLoadConfig(() -> loadConfig(config));
-                DatabaseDescriptor.setDaemonInitialized();
                 DatabaseDescriptor.createAllDirectories();
 
-                // We need to persist this as soon as possible after startup checks.
+                // We need to  persist this as soon as possible after startup checks.
                 // This should be the first write to SystemKeyspace (CASSANDRA-11742)
                 SystemKeyspace.persistLocalMetadata();
                 LegacySchemaMigrator.migrate();
@@ -287,8 +294,17 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                     throw new RuntimeException(e);
                 }
 
-                initializeRing(cluster);
-                registerMockMessaging(cluster);
+                // TODO: support each separately
+                if (with.contains(Feature.GOSSIP) || with.contains(Feature.NETWORK))
+                {
+                    StorageService.instance.prepareToJoin();
+                    StorageService.instance.joinTokenRing(1000);
+                }
+                else
+                {
+                    initializeRing(cluster);
+                    registerMockMessaging(cluster);
+                }
 
                 SystemKeyspace.finishStartup();
 
@@ -376,6 +392,14 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
 
     public Future<Void> shutdown()
     {
+        return shutdown(true);
+    }
+
+    public Future<Void> shutdown(boolean graceful)
+    {
+        if (!graceful)
+            MessagingService.instance().shutdown(false);
+
         Future<?> future = async((ExecutorService executor) -> {
             Throwable error = null;
             error = parallelRun(error, executor,
@@ -383,7 +407,6 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
                     CompactionManager.instance::forceShutdown,
                     BatchlogManager.instance::shutdown,
                     HintsService.instance::shutdownBlocking,
-                    CommitLog.instance::shutdownBlocking,
                     SecondaryIndexManager::shutdownExecutors,
                     ColumnFamilyStore::shutdownFlushExecutor,
                     ColumnFamilyStore::shutdownPostFlushExecutor,
@@ -401,7 +424,10 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
             );
             error = parallelRun(error, executor,
                                 StageManager::shutdownAndWait,
-                                SharedExecutorPool.SHARED::shutdown
+                                SharedExecutorPool.SHARED::shutdownAndWait
+            );
+            error = parallelRun(error, executor,
+                                CommitLog.instance::shutdownBlocking
             );
 
             LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index 56c8074..57530e0 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@ -47,7 +47,7 @@ public class InstanceClassLoader extends URLClassLoader
             || name.startsWith("sun.")
             || name.startsWith("oracle.")
             || name.startsWith("com.sun.")
-            || name.startsWith("com.oracle.")
+            || name.startsWith("com.sun.")
             || name.startsWith("java.")
             || name.startsWith("javax.")
             || name.startsWith("jdk.")
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 18ca17f..3945ec5 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -49,6 +49,7 @@ public class DistributedTestBase
 
     protected static <C extends AbstractCluster<?>> C init(C cluster)
     {
+        cluster.startup();
         cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
         return cluster;
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
new file mode 100644
index 0000000..11e9985
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class GossipTest extends DistributedTestBase
+{
+
+    @Test
+    public void nodeDownDuringMove() throws Throwable
+    {
+        int liveCount = 1;
+        System.setProperty("cassandra.consistent.rangemovement", "false");
+        try (Cluster cluster = Cluster.create(2 + liveCount, EnumSet.of(Feature.GOSSIP)))
+        {
+            int fail = liveCount + 1;
+            int late = fail + 1;
+            for (int i = 1 ; i <= liveCount ; ++i)
+                cluster.get(i).startup();
+            cluster.get(fail).startup();
+            Collection<String> expectTokens = cluster.get(fail).callsOnInstance(() ->
+                StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress())
+                                       .stream().map(Object::toString).collect(Collectors.toList())
+            ).call();
+
+            InetAddress failAddress = cluster.get(fail).broadcastAddressAndPort().address;
+            // wait for NORMAL state
+            for (int i = 1 ; i <= liveCount ; ++i)
+            {
+                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
+                    EndpointState ep;
+                    while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
+                           || ep.getApplicationState(ApplicationState.STATUS) == null
+                           || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("NORMAL"))
+                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
+                }).accept(failAddress);
+            }
+
+            // set ourselves to MOVING, and wait for it to propagate
+            cluster.get(fail).runOnInstance(() -> {
+
+                Token token = Iterables.getFirst(StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()), null);
+                Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.moving(token));
+            });
+
+            for (int i = 1 ; i <= liveCount ; ++i)
+            {
+                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
+                    EndpointState ep;
+                    while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
+                           || (ep.getApplicationState(ApplicationState.STATUS) == null
+                               || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING")))
+                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
+                }).accept(failAddress);
+            }
+
+            cluster.get(fail).shutdown(false).get();
+            cluster.get(late).startup();
+            cluster.get(late).acceptsOnInstance((InetAddress endpoint) -> {
+                EndpointState ep;
+                while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
+                       || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING"))
+                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
+            }).accept(failAddress);
+
+            Collection<String> tokens = cluster.get(late).appliesOnInstance((InetAddress endpoint) ->
+                StorageService.instance.getTokenMetadata().getTokens(failAddress)
+                                       .stream().map(Object::toString).collect(Collectors.toList())
+            ).apply(failAddress);
+
+            Assert.assertEquals(expectTokens, tokens);
+        }
+    }
+    
+}
diff --git a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
index 011a8ba..e4c695c 100644
--- a/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/SEPExecutorTest.java
@@ -56,7 +56,7 @@ public class SEPExecutorTest
         }
 
         // shutdown does not guarantee that threads are actually dead once it exits, only that they will stop promptly afterwards
-        sharedPool.shutdown();
+        sharedPool.shutdownAndWait();
         for (Thread thread : Thread.getAllStackTraces().keySet())
         {
             if (thread.getName().contains(MAGIC))
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index 53365aa..bc6c6d2 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -479,7 +479,16 @@ public class MoveTest
     {
         tmd.removeFromMoving(host);
         assertTrue(!tmd.isMoving(host));
-        tmd.updateNormalToken(new BigIntegerToken(String.valueOf(token)), host);
+        Token newToken = new BigIntegerToken(String.valueOf(token));
+        tmd.updateNormalToken(newToken, host);
+        // As well as upating TMD, update the host's tokens in gossip. Since CASSANDRA-15120, status changing to MOVING
+        // ensures that TMD is up to date with token assignments according to gossip. So we need to make sure gossip has
+        // the correct new token, as the moving node itself would do upon successful completion of the move operation.
+        // Without this, the next movement for that host will set the token in TMD's back to the old value from gossip
+        // and incorrect range movements will follow
+        Gossiper.instance.injectApplicationState(host,
+                                                 ApplicationState.TOKENS,
+                                                 new VersionedValue.VersionedValueFactory(partitioner).tokens(Collections.singleton(newToken)));
     }
 
     private Map.Entry<Range<Token>, Collection<InetAddress>> generatePendingMapEntry(int start, int end, String... endpoints) throws UnknownHostException


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org