You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/10/12 18:08:17 UTC

[cassandra] branch trunk updated (0e0056c -> 2ae1ec5)

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 0e0056c  Ninja: add a missing CHANGES.txt entry for CASSANDRA-16155
     new 521a6e2  Fixed a NullPointerException when calling nodetool enablethrift
     new 42989ce  Merge branch 'cassandra-2.2' into cassandra-3.0
     new 34dde96  Merge branch 'cassandra-3.0' into cassandra-3.11
     new 2ae1ec5  Merge branch 'cassandra-3.11' into trunk

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   2 +
 build.xml                                          |   3 +
 .../apache/cassandra/service/CassandraDaemon.java  | 108 +++++------
 .../apache/cassandra/service/StorageService.java   |   2 +-
 .../distributed/impl/AbstractCluster.java          |  18 +-
 .../cassandra/distributed/impl/Instance.java       |  41 +++-
 .../cassandra/distributed/shared/Byteman.java      | 207 +++++++++++++++++++++
 .../shared/{RepairResult.java => Shared.java}      |  24 ++-
 .../test/BootstrapBinaryDisabledTest.java          | 165 ++++++++++++++++
 .../test/ClientNetworkStopStartTest.java           |  79 ++++++++
 .../distributed/test/TopologyChangeTest.java       |  45 +++--
 test/resources/byteman/stream_failure.btm          |  14 ++
 12 files changed, 611 insertions(+), 97 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/shared/Byteman.java
 copy test/distributed/org/apache/cassandra/distributed/shared/{RepairResult.java => Shared.java} (56%)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
 create mode 100644 test/resources/byteman/stream_failure.btm


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by dc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 2ae1ec5dd2d98178f3ab4b3ed64a87147e713560
Merge: 0e0056c 34dde96
Author: David Capwell <dc...@apache.org>
AuthorDate: Mon Oct 12 11:06:42 2020 -0700

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   2 +
 build.xml                                          |   3 +
 .../apache/cassandra/service/CassandraDaemon.java  | 108 +++++------
 .../apache/cassandra/service/StorageService.java   |   2 +-
 .../distributed/impl/AbstractCluster.java          |  18 +-
 .../cassandra/distributed/impl/Instance.java       |  41 +++-
 .../cassandra/distributed/shared/Byteman.java      | 207 +++++++++++++++++++++
 .../cassandra/distributed/shared/Shared.java       |  37 ++++
 .../test/BootstrapBinaryDisabledTest.java          | 165 ++++++++++++++++
 .../test/ClientNetworkStopStartTest.java           |  79 ++++++++
 .../distributed/test/TopologyChangeTest.java       |  45 +++--
 test/resources/byteman/stream_failure.btm          |  14 ++
 12 files changed, 633 insertions(+), 88 deletions(-)

diff --cc CHANGES.txt
index bf80c8c,ee70af5..6829dac
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -29,34 -6,15 +29,36 @@@ Merged from 3.11
  Merged from 3.0:
   * Handle unexpected columns due to schema races (CASSANDRA-15899)
   * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160)
+ Merged from 2.2:
+  * Fixed a NullPointerException when calling nodetool enablethrift (CASSANDRA-16127)
  
 -3.11.8
 +4.0-beta2
 + * Add addition incremental repair visibility to nodetool repair_admin (CASSANDRA-14939)
 + * Always access system properties and environment variables via the new CassandraRelevantProperties and CassandraRelevantEnv classes (CASSANDRA-15876)
 + * Remove deprecated HintedHandOffManager (CASSANDRA-15939)
 + * Prevent repair from overrunning compaction (CASSANDRA-15817)
 + * fix cqlsh COPY functions in Python 3.8 on Mac (CASSANDRA-16053)
 + * Strip comment blocks from cqlsh input before processing statements (CASSANDRA-15802)
 + * Fix unicode chars error input (CASSANDRA-15990)
 + * Improved testability for CacheMetrics and ChunkCacheMetrics (CASSANDRA-15788)
 + * Handle errors in StreamSession#prepare (CASSANDRA-15852)
 + * FQL replay should have options to ignore DDL statements (CASSANDRA-16039)
 + * Remove COMPACT STORAGE internals (CASSANDRA-13994)
 + * Make TimestampSerializer accept fractional seconds of varying precision (CASSANDRA-15976)
 + * Improve cassandra-stress logging when using a profile file that doesn't exist (CASSANDRA-14425)
 + * Improve logging for socket connection/disconnection (CASSANDRA-15980)
 + * Throw FSWriteError upon write failures in order to apply DiskFailurePolicy (CASSANDRA-15928)
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 + * Resolve JMX output inconsistencies from CASSANDRA-7544 storage-port-configurable-per-node (CASSANDRA-15937)
 +Merged from 3.11:
   * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071)
   * Fix short read protection for GROUP BY queries (CASSANDRA-15459)
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857)
  Merged from 3.0:
 - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935)
   * Fix gossip shutdown order (CASSANDRA-15816)
   * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432)
   * Check for endpoint collision with hibernating nodes (CASSANDRA-14599)
diff --cc build.xml
index e026630,191c1c8..5c9ac2f
--- a/build.xml
+++ b/build.xml
@@@ -582,13 -412,20 +582,14 @@@
            <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations" version="2.9.10"/>
            <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/>
            <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/>
 -          <dependency groupId="com.github.jbellis" artifactId="jamm" version="0.3.0"/>
 -
 -          <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7">
 -            <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/>
 -            <exclusion groupId="junit" artifactId="junit"/>
 -          </dependency>
 -          <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/>
 -          <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.2">
 -	         <exclusion groupId="commons-logging" artifactId="commons-logging"/>
 -          </dependency>
 -          <dependency groupId="junit" artifactId="junit" version="4.6" />
 +          <dependency groupId="com.github.jbellis" artifactId="jamm" version="${jamm.version}"/>
 +          <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.26"/>
 +          <dependency groupId="junit" artifactId="junit" version="4.12" />
            <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" />
 +          <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" />
 +          <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
            <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.5" />
+           <dependency groupId="org.reflections" artifactId="reflections" version="0.9.12" />
            <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10">
               <exclusion groupId="commons-lang" artifactId="commons-lang"/>
            </dependency>
@@@ -731,19 -542,22 +732,20 @@@
                  version="${version}"/>
          <dependency groupId="junit" artifactId="junit"/>
          <dependency groupId="org.mockito" artifactId="mockito-core" />
 -        <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
 +        <dependency groupId="org.quicktheories" artifactId="quicktheories" />
+         <dependency groupId="org.reflections" artifactId="reflections" />
 +        <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" />
 +        <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
 +        <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
          <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
          <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
 -      	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
 -      	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
 +        <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>
 +        <dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
          <dependency groupId="org.antlr" artifactId="antlr"/>
 -        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded">
 -          <exclusion groupId="io.netty" artifactId="netty-buffer"/>
 -          <exclusion groupId="io.netty" artifactId="netty-codec"/>
 -          <exclusion groupId="io.netty" artifactId="netty-handler"/>
 -          <exclusion groupId="io.netty" artifactId="netty-transport"/>
 -        </dependency>
 +        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
          <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
 -        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.4" />
 -        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.4" />
 +        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
 +        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8"/>
          <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
          <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/>
          <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/>
@@@ -770,7 -572,14 +772,8 @@@
                  version="${version}"/>
          <dependency groupId="junit" artifactId="junit"/>
          <dependency groupId="org.mockito" artifactId="mockito-core" />
 -        <dependency groupId="org.apache.cassandra" artifactId="dtest-api" />
+         <dependency groupId="org.reflections" artifactId="reflections" />
 -        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded">
 -          <exclusion groupId="io.netty" artifactId="netty-buffer"/>
 -          <exclusion groupId="io.netty" artifactId="netty-codec"/>
 -          <exclusion groupId="io.netty" artifactId="netty-handler"/>
 -          <exclusion groupId="io.netty" artifactId="netty-transport"/>
 -        </dependency>
 +        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
          <dependency groupId="io.netty" artifactId="netty-all"/>
          <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
          <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index 4b92d69,d8bd165..6d6bc70
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -38,26 -44,19 +44,20 @@@ import com.codahale.metrics.jvm.BufferP
  import com.codahale.metrics.jvm.FileDescriptorRatioGauge;
  import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
  import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
- import com.google.common.annotations.VisibleForTesting;
- import com.google.common.util.concurrent.Futures;
- import com.google.common.util.concurrent.ListenableFuture;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- 
 -import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 +import org.apache.cassandra.audit.AuditLogManager;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
- import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
- import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
- import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
- import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.net.StartupClusterConnectivityChecker;
- import org.apache.cassandra.schema.TableMetadata;
 -import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.config.DatabaseDescriptor;
- import org.apache.cassandra.schema.Schema;
- import org.apache.cassandra.schema.SchemaConstants;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.config.SchemaConstants;
  import org.apache.cassandra.cql3.QueryProcessor;
- import org.apache.cassandra.db.*;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.SizeEstimatesRecorder;
+ import org.apache.cassandra.db.SystemKeyspace;
++import org.apache.cassandra.db.SystemKeyspaceMigrator40;
+ import org.apache.cassandra.db.WindowsFailedSnapshotTracker;
  import org.apache.cassandra.db.commitlog.CommitLog;
++import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
++import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
++import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace;
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.exceptions.StartupException;
  import org.apache.cassandra.gms.Gossiper;
@@@ -65,22 -65,21 +65,33 @@@ import org.apache.cassandra.io.FSError
  import org.apache.cassandra.io.sstable.CorruptSSTableException;
  import org.apache.cassandra.io.sstable.SSTableHeaderFix;
  import org.apache.cassandra.io.util.FileUtils;
++import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.metrics.CassandraMetricsRegistry;
  import org.apache.cassandra.metrics.DefaultNameFactory;
  import org.apache.cassandra.metrics.StorageMetrics;
- import org.apache.cassandra.tracing.Tracing;
- import org.apache.cassandra.utils.*;
 -import org.apache.cassandra.schema.LegacySchemaMigrator;
++import org.apache.cassandra.net.StartupClusterConnectivityChecker;
++import org.apache.cassandra.schema.Schema;
++import org.apache.cassandra.schema.SchemaConstants;
++import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.security.ThreadAwareSecurityManager;
 -import org.apache.cassandra.thrift.ThriftServer;
+ import org.apache.cassandra.tracing.Tracing;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.JMXServerUtils;
+ import org.apache.cassandra.utils.JVMStabilityInspector;
+ import org.apache.cassandra.utils.MBeanWrapper;
+ import org.apache.cassandra.utils.Mx4jTool;
+ import org.apache.cassandra.utils.NativeLibrary;
+ import org.apache.cassandra.utils.WindowsTimer;
  
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_FOREGROUND;
- import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_PID_FILE;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_JMX_REMOTE_PORT;
++import static org.apache.cassandra.config.CassandraRelevantProperties.CASSANDRA_PID_FILE;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.COM_SUN_MANAGEMENT_JMXREMOTE_PORT;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_CLASS_PATH;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VERSION;
 +import static org.apache.cassandra.config.CassandraRelevantProperties.JAVA_VM_NAME;
 +
  /**
   * The <code>CassandraDaemon</code> is an abstraction for a Cassandra daemon
   * service, which defines not only a way to activate and deactivate it, but also
@@@ -167,24 -166,10 +178,24 @@@ public class CassandraDaemo
          }
      }
  
 +    @VisibleForTesting
 +    public static Runnable SPECULATION_THRESHOLD_UPDATER = 
 +        () -> 
 +        {
 +            try
 +            {
 +                Keyspace.allExisting().forEach(k -> k.getColumnFamilyStores().forEach(ColumnFamilyStore::updateSpeculationThreshold));
 +            }
 +            catch (Throwable t)
 +            {
 +                logger.warn("Failed to update speculative retry thresholds.", t);
 +                JVMStabilityInspector.inspectThrowable(t);
 +            }
 +        };
 +    
      static final CassandraDaemon instance = new CassandraDaemon();
  
-     private NativeTransportService nativeTransportService;
 -    private volatile Server thriftServer;
+     private volatile NativeTransportService nativeTransportService;
      private JMXConnectorServer jmxServer;
  
      private final boolean runManaged;
@@@ -445,25 -435,20 +456,25 @@@
          // due to scheduling errors or race conditions
          ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(ColumnFamilyStore.getBackgroundCompactionTaskSubmitter(), 5, 1, TimeUnit.MINUTES);
  
 +        // schedule periodic recomputation of speculative retry thresholds
 +        ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(SPECULATION_THRESHOLD_UPDATER, 
 +                                                                DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS),
 +                                                                DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS),
 +                                                                NANOSECONDS);
 +
-         initializeNativeTransport();
+         initializeClientTransports();
  
          completeSetup();
      }
  
 -    public synchronized void initializeClientTransports()
 +    public void setupVirtualKeyspaces()
      {
 -        // Thrift
 -        InetAddress rpcAddr = DatabaseDescriptor.getRpcAddress();
 -        int rpcPort = DatabaseDescriptor.getRpcPort();
 -        int listenBacklog = DatabaseDescriptor.getRpcListenBacklog();
 -        if (thriftServer == null)
 -            thriftServer = new ThriftServer(rpcAddr, rpcPort, listenBacklog);
 +        VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance);
 +        VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
 +    }
  
-     public void initializeNativeTransport()
++    public synchronized void initializeClientTransports()
 +    {
          // Native transport
          if (nativeTransportService == null)
              nativeTransportService = new NativeTransportService();
@@@ -560,34 -530,24 +571,28 @@@
       */
      public void start()
      {
 +        StartupClusterConnectivityChecker connectivityChecker = StartupClusterConnectivityChecker.create(DatabaseDescriptor.getBlockForPeersTimeoutInSeconds(),
 +                                                                                                         DatabaseDescriptor.getBlockForPeersInRemoteDatacenters());
 +        connectivityChecker.execute(Gossiper.instance.getEndpoints(), DatabaseDescriptor.getEndpointSnitch()::getDatacenter);
 +
-         // We only start transports if bootstrap has completed and we're not in survey mode,
-         // OR if we are in survey mode and streaming has completed but we're not using auth
-         // OR if we have not joined the ring yet.
-         if (StorageService.instance.hasJoined())
+         // check to see if transports may start else return without starting.  This is needed when in survey mode or
+         // when bootstrap has not completed.
+         try
          {
-             if (StorageService.instance.isSurveyMode())
-             {
-                 if (StorageService.instance.isBootstrapMode() || DatabaseDescriptor.getAuthenticator().requireAuthentication())
-                 {
-                     logger.info("Not starting client transports in write_survey mode as it's bootstrapping or " +
-                             "auth is enabled");
-                     return;
-                 }
-             }
-             else
-             {
-                 if (!SystemKeyspace.bootstrapComplete())
-                 {
-                     logger.info("Not starting client transports as bootstrap has not completed");
-                     return;
-                 }
-             }
+             validateTransportsCanStart();
          }
+         catch (IllegalStateException isx)
+         {
+             // If there are any errors, we just log and return in this case
 -            logger.info(isx.getMessage());
++            logger.warn(isx.getMessage());
+             return;
+         }
+ 
+         startClientTransports();
+     }
  
+     private void startClientTransports()
+     {
          String nativeFlag = System.getProperty("cassandra.start_native_transport");
          if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport()))
          {
@@@ -631,16 -596,14 +635,13 @@@
      }
  
      @VisibleForTesting
-     public void destroyNativeTransport() throws InterruptedException
+     public void destroyClientTransports()
      {
 -        stopThriftServer();
+         stopNativeTransport();
          if (nativeTransportService != null)
-         {
              nativeTransportService.destroy();
-             nativeTransportService = null;
-         }
      }
  
- 
      /**
       * Clean up all resources obtained during the lifetime of the daemon. This
       * is a hook for JSVC.
@@@ -765,10 -726,42 +768,9 @@@
  
      public boolean isNativeTransportRunning()
      {
-         return nativeTransportService != null ? nativeTransportService.isRunning() : false;
+         return nativeTransportService != null && nativeTransportService.isRunning();
      }
  
 -    public void startThriftServer()
 -    {
 -        validateTransportsCanStart();
 -
 -        if (thriftServer == null)
 -            throw new IllegalStateException("setup() must be called first for CassandraDaemon");
 -        thriftServer.start();
 -    }
 -
 -    public void stopThriftServer()
 -    {
 -        if (thriftServer != null)
 -        {
 -            thriftServer.stop();
 -        }
 -    }
 -
 -    public boolean isThriftServerRunning()
 -    {
 -        return thriftServer != null && thriftServer.isRunning();
 -    }
 -
 -    public int getMaxNativeProtocolVersion()
 -    {
 -        return nativeTransportService.getMaxProtocolVersion();
 -    }
 -
 -    public void refreshMaxNativeProtocolVersion()
 -    {
 -        if (nativeTransportService != null)
 -            nativeTransportService.refreshMaxNegotiableProtocolVersion();
 -    }
--
      /**
       * A convenience method to stop and destroy the daemon in one shot.
       */
diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 3fee754,5477e36..361519d
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -35,8 -35,8 +35,9 @@@ import java.util.concurrent.atomic.Atom
  import java.util.function.BiConsumer;
  import java.util.function.BiPredicate;
  import java.util.function.Consumer;
+ import java.util.function.Predicate;
  import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
  import java.util.stream.Stream;
  
  import com.google.common.collect.Sets;
@@@ -68,12 -70,11 +70,13 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.shared.ShutdownException;
  import org.apache.cassandra.distributed.shared.Versions;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.Verb;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.concurrent.SimpleCondition;
+ import org.reflections.Reflections;
  
 +import static org.apache.cassandra.distributed.shared.NetworkTopology.addressAndPort;
 +
  /**
   * AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}.
   *
@@@ -186,11 -174,11 +196,11 @@@ public abstract class AbstractCluster<
  
          private IInvokableInstance newInstance(int generation)
          {
-             ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader);
+             ClassLoader classLoader = new InstanceClassLoader(generation, config.num(), version.classpath, sharedClassLoader, SHARED_PREDICATE);
              if (instanceInitializer != null)
                  instanceInitializer.accept(classLoader, config.num());
 -            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, IInvokableInstance>)Instance::new, classLoader)
 -                                        .apply(config, classLoader);
 +            return Instance.transferAdhoc((SerializableBiFunction<IInstanceConfig, ClassLoader, Instance>)Instance::new, classLoader)
 +                                        .apply(config.forVersion(version.major), classLoader);
          }
  
          public IInstanceConfig config()
@@@ -769,5 -729,11 +779,11 @@@
                 .collect(Collectors.toList());
      }
  
+     private static Set<String> findClassesMarkedForSharedClassLoader()
+     {
+         return new Reflections("org.apache.cassandra").getTypesAnnotatedWith(Shared.class).stream()
 -                                                      .map(Class::getName)
 -                                                      .collect(Collectors.toSet());
++                                .map(Class::getName)
++                                .collect(Collectors.toSet());
+     }
  }
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 83669e2,aa37029..6ad0712
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -152,11 -142,8 +152,12 @@@ public class Instance extends IsolatedE
          // Set the config at instance creation, possibly before startup() has run on all other instances.
          // setMessagingVersions below will call runOnInstance which will instantiate
          // the MessagingService and dependencies preventing later changes to network parameters.
-         Config.setOverrideLoadConfig(() -> loadConfig(config));
+         Config single = loadConfig(config);
+         Config.setOverrideLoadConfig(() -> single);
 +
 +        // Enable streaming inbound handler tracking so they can be closed properly without leaking
 +        // the blocking IO thread.
 +        StreamingInboundHandler.trackInboundHandlers();
      }
  
      @Override
@@@ -405,6 -511,6 +406,9 @@@
                      throw e;
                  }
  
++                // Start up virtual table support
++                CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces();
++
                  Keyspace.setInitialized();
  
                  // Replay any CommitLogSegments found on disk
@@@ -430,12 -535,9 +434,13 @@@
  //                    -- not sure what that means?  SocketFactory.instance.getClass();
                      registerMockMessaging(cluster);
                  }
 +                registerInboundFilter(cluster);
 +                registerOutboundFilter(cluster);
 +
 +                JVMStabilityInspector.replaceKiller(new InstanceKiller());
  
                  // TODO: this is more than just gossip
+                 StorageService.instance.registerDaemon(CassandraDaemon.getInstanceForTesting());
                  if (config.has(GOSSIP))
                  {
                      StorageService.instance.initServer();
@@@ -451,21 -552,18 +456,19 @@@
  
                  SystemKeyspace.finishStartup();
  
+                 CassandraDaemon.getInstanceForTesting().setupCompleted();
+ 
                  if (config.has(NATIVE_PROTOCOL))
                  {
-                     // Start up virtual table support
-                     CassandraDaemon.getInstanceForTesting().setupVirtualKeyspaces();
- 
-                     CassandraDaemon.getInstanceForTesting().initializeNativeTransport();
-                     CassandraDaemon.getInstanceForTesting().startNativeTransport();
-                     StorageService.instance.setRpcReady(true);
+                     CassandraDaemon.getInstanceForTesting().initializeClientTransports();
+                     CassandraDaemon.getInstanceForTesting().start();
                  }
  
 -                if (!FBUtilities.getBroadcastAddress().equals(broadcastAddress().getAddress()))
 -                    throw new IllegalStateException();
 -                if (DatabaseDescriptor.getStoragePort() != broadcastAddress().getPort())
 -                    throw new IllegalStateException();
 +                if (!FBUtilities.getBroadcastAddressAndPort().address.equals(broadcastAddress().getAddress()) ||
 +                    FBUtilities.getBroadcastAddressAndPort().port != broadcastAddress().getPort())
 +                    throw new IllegalStateException(String.format("%s != %s", FBUtilities.getBroadcastAddressAndPort(), broadcastAddress()));
 +
 +                ActiveRepairService.instance.start();
              }
              catch (Throwable t)
              {
@@@ -586,20 -682,15 +589,21 @@@
                                  () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
                                  () -> Ref.shutdownReferenceReaper(1L, MINUTES),
                                  () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
 +                                () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES),
 +                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
                                  () -> SSTableReader.shutdownBlocking(1L, MINUTES),
 -                                () -> DiagnosticSnapshotService.instance.shutdownAndWait(1L, MINUTES)
 +                                () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor())),
 +                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
              );
 +
              error = parallelRun(error, executor,
 -                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
 -                                (IgnoreThrowingRunnable) MessagingService.instance()::shutdown
 +                                CommitLog.instance::shutdownBlocking,
-                                 () -> MessagingService.instance().shutdown(1L, MINUTES, false, true)
++                                // can only shutdown message once, so if the test shutsdown an instance, then ignore the failure
++                                (IgnoreThrowingRunnable) () -> MessagingService.instance().shutdown(1L, MINUTES, false, true)
              );
              error = parallelRun(error, executor,
 -                                () -> StageManager.shutdownAndWait(1L, MINUTES),
 +                                () -> GlobalEventExecutor.INSTANCE.awaitInactivity(1l, MINUTES),
 +                                () -> Stage.shutdownAndWait(1L, MINUTES),
                                  () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
              );
              error = parallelRun(error, executor,
diff --cc test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
index 0000000,3ac5028..a7ac605
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/BootstrapBinaryDisabledTest.java
@@@ -1,0 -1,165 +1,165 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.TimeoutException;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInstanceConfig;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.distributed.api.LogResult;
+ import org.apache.cassandra.distributed.api.SimpleQueryResult;
+ import org.apache.cassandra.distributed.api.TokenSupplier;
+ import org.apache.cassandra.distributed.shared.Byteman;
+ import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.distributed.shared.Shared;
+ 
+ /**
+  * Replaces python dtest bootstrap_test.py::TestBootstrap::test_bootstrap_binary_disabled
+  */
+ public class BootstrapBinaryDisabledTest extends TestBaseImpl
+ {
+     @Test
+     public void test() throws IOException, TimeoutException
+     {
+         Map<String, Object> config = new HashMap<>();
+         config.put("authenticator", "org.apache.cassandra.auth.PasswordAuthenticator");
+         config.put("authorizer", "org.apache.cassandra.auth.CassandraAuthorizer");
+         config.put("role_manager", "org.apache.cassandra.auth.CassandraRoleManager");
+         config.put("permissions_validity_in_ms", 0);
+         config.put("roles_validity_in_ms", 0);
+ 
+         int originalNodeCount = 1;
+         int expandedNodeCount = originalNodeCount + 2;
+         Byteman byteman = Byteman.createFromScripts("test/resources/byteman/stream_failure.btm");
+         try (Cluster cluster = init(Cluster.build(originalNodeCount)
+                                            .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
+                                            .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
+                                            .withConfig(c -> {
+                                                config.forEach(c::set);
+                                                c.with(Feature.GOSSIP, Feature.NETWORK, Feature.NATIVE_PROTOCOL);
+                                            })
+                                            .withInstanceInitializer((cl, nodeNumber) -> {
+                                                switch (nodeNumber) {
+                                                    case 1:
+                                                    case 2:
+                                                        byteman.install(cl);
+                                                        break;
+                                                }
+                                            })
+                                            .start()))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk text primary key)");
+             populate(cluster.get(1));
+             cluster.forEach(c -> c.flush(KEYSPACE));
+ 
+             bootstrap(cluster, config, false);
+             // Test write survey behaviour
+             bootstrap(cluster, config, true);
+         }
+     }
+ 
+     private static void bootstrap(Cluster cluster,
+                                   Map<String, Object> config,
+                                   boolean isWriteSurvey) throws TimeoutException
+     {
+         IInstanceConfig nodeConfig = cluster.newInstanceConfig();
+         nodeConfig.set("auto_bootstrap", true);
+         config.forEach(nodeConfig::set);
+ 
+         //TODO can we make this more isolated?
+         System.setProperty("cassandra.ring_delay_ms", "5000");
+         if (isWriteSurvey)
+             System.setProperty("cassandra.write_survey", "true");
+ 
+         RewriteEnabled.enable();
+         cluster.bootstrap(nodeConfig).startup();
+         IInvokableInstance node = cluster.get(cluster.size());
+         assertLogHas(node, "Some data streaming failed");
+         assertLogHas(node, isWriteSurvey ?
+                            "Not starting client transports in write_survey mode as it's bootstrapping or auth is enabled" :
+                            "Node is not yet bootstrapped completely");
+ 
+         node.nodetoolResult("join").asserts()
 -             .failure()
 -             .errorContains("Cannot join the ring until bootstrap completes");
++            .failure()
++            .errorContains("Cannot join the ring until bootstrap completes");
+ 
+         RewriteEnabled.disable();
+         node.nodetoolResult("bootstrap", "resume").asserts().success();
+         if (isWriteSurvey)
+             assertLogHas(node, "Not starting client transports in write_survey mode as it's bootstrapping or auth is enabled");
+ 
+         if (isWriteSurvey)
+         {
+             node.nodetoolResult("join").asserts().success();
+             assertLogHas(node, "Leaving write survey mode and joining ring at operator request");
+         }
+ 
+         node.logs().watchFor("Starting listening for CQL clients");
+         assertBootstrapState(node, "COMPLETED");
+     }
+ 
+     private static void assertBootstrapState(IInvokableInstance node, String expected)
+     {
+         SimpleQueryResult qr = node.executeInternalWithResult("SELECT bootstrapped FROM system.local WHERE key='local'");
+         Assert.assertTrue("No rows found", qr.hasNext());
+         Assert.assertEquals(expected, qr.next().getString("bootstrapped"));
+     }
+ 
+     private static void assertLogHas(IInvokableInstance node, String msg)
+     {
+         LogResult<List<String>> results = node.logs().grep(msg);
+         Assert.assertFalse("Unable to find '" + msg + "'", results.getResult().isEmpty());
+     }
+ 
+     private void populate(IInvokableInstance inst)
+     {
+         for (int i = 0; i < 10; i++)
+             inst.executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk) VALUES (?)", Integer.toString(i));
+     }
+ 
+     @Shared
+     public static final class RewriteEnabled
+     {
+         private static volatile boolean enabled = false;
+ 
+         public static boolean isEnabled()
+         {
+             return enabled;
+         }
+ 
+         public static void enable()
+         {
+             enabled = true;
+         }
+ 
+         public static void disable()
+         {
+             enabled = false;
+         }
+     }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
index 0000000,1d23ac7..1d9029b
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/ClientNetworkStopStartTest.java
@@@ -1,0 -1,192 +1,79 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
 -import java.io.ByteArrayOutputStream;
+ import java.io.IOException;
 -import java.io.PrintStream;
 -import java.util.Arrays;
 -import java.util.Collections;
 -import java.util.Objects;
+ 
 -import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import com.datastax.driver.core.Session;
 -import org.apache.cassandra.db.marshal.CompositeType;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.ConsistencyLevel;
+ import org.apache.cassandra.distributed.api.Feature;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
++import org.apache.cassandra.distributed.api.NodeToolResult;
+ import org.apache.cassandra.distributed.api.QueryResults;
+ import org.apache.cassandra.distributed.api.SimpleQueryResult;
+ import org.apache.cassandra.distributed.shared.AssertUtils;
 -import org.apache.cassandra.thrift.Column;
 -import org.apache.cassandra.thrift.ColumnOrSuperColumn;
 -import org.apache.cassandra.thrift.Mutation;
 -import org.apache.cassandra.utils.ByteBufferUtil;
 -import org.apache.thrift.TException;
 -import org.hamcrest.BaseMatcher;
 -import org.hamcrest.Description;
+ 
+ public class ClientNetworkStopStartTest extends TestBaseImpl
+ {
+     /**
+      * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-16127">CASSANDRA-16127</a>
+      */
+     @Test
 -    public void stopStartThrift() throws IOException, TException
 -    {
 -        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.NATIVE_PROTOCOL)).start()))
 -        {
 -            IInvokableInstance node = cluster.get(1);
 -            assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", true);
 -            node.nodetoolResult("disablethrift").asserts().success();
 -            assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", false);
 -            node.nodetoolResult("enablethrift").asserts().success();
 -            assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", true);
 -
 -            // now use it to make sure it still works!
 -            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, value int, PRIMARY KEY (pk))");
 -
 -            ThriftClientUtils.thriftClient(node, thrift -> {
 -                thrift.set_keyspace(KEYSPACE);
 -                Mutation mutation = new Mutation();
 -                ColumnOrSuperColumn csoc = new ColumnOrSuperColumn();
 -                Column column = new Column();
 -                column.setName(CompositeType.build(ByteBufferUtil.bytes("value")));
 -                column.setValue(ByteBufferUtil.bytes(0));
 -                column.setTimestamp(System.currentTimeMillis());
 -                csoc.setColumn(column);
 -                mutation.setColumn_or_supercolumn(csoc);
 -
 -                thrift.batch_mutate(Collections.singletonMap(ByteBufferUtil.bytes(0),
 -                                                             Collections.singletonMap("tbl", Arrays.asList(mutation))),
 -                                    org.apache.cassandra.thrift.ConsistencyLevel.ALL);
 -            });
 -
 -            SimpleQueryResult qr = cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
 -            AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build());
 -        }
 -    }
 -
 -    /**
 -     * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-16127">CASSANDRA-16127</a>
 -     */
 -    @Test
+     public void stopStartNative() throws IOException
+     {
 -        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.NATIVE_PROTOCOL)).start()))
++        //TODO why does trunk need GOSSIP for native to work but no other branch does?
++        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL)).start()))
+         {
+             IInvokableInstance node = cluster.get(1);
+             assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", true);
+             node.nodetoolResult("disablebinary").asserts().success();
+             assertTransportStatus(node, "binary", false);
 -            assertTransportStatus(node, "thrift", true);
+             node.nodetoolResult("enablebinary").asserts().success();
+             assertTransportStatus(node, "binary", true);
 -            assertTransportStatus(node, "thrift", true);
+ 
+             // now use it to make sure it still works!
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, value int, PRIMARY KEY (pk))");
+ 
+             try (com.datastax.driver.core.Cluster client = com.datastax.driver.core.Cluster.builder().addContactPoints(node.broadcastAddress().getAddress()).build();
+                  Session session = client.connect())
+             {
+                 session.execute("INSERT INTO " + KEYSPACE + ".tbl (pk, value) VALUES (?, ?)", 0, 0);
+             }
+ 
+             SimpleQueryResult qr = cluster.coordinator(1).executeWithResult("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL);
+             AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build());
+         }
+     }
+ 
+     private static void assertTransportStatus(IInvokableInstance node, String transport, boolean running)
+     {
+         assertNodetoolStdout(node, running ? "running" : "not running", running ? "not running" : null, "status" + transport);
+     }
+ 
+     private static void assertNodetoolStdout(IInvokableInstance node, String expectedStatus, String notExpected, String... nodetool)
+     {
 -        // without CASSANDRA-16057 need this hack
 -        PrintStream previousStdout = System.out;
 -        try
 -        {
 -            ByteArrayOutputStream out = new ByteArrayOutputStream();
 -            PrintStream stdout = new PrintStream(out, true);
 -            System.setOut(stdout);
 -
 -            node.nodetoolResult(nodetool).asserts().success();
 -
 -            stdout.flush();
 -            String output = out.toString();
 -            Assert.assertThat(output, new StringContains(expectedStatus));
 -            if (notExpected != null)
 -                Assert.assertThat(output, new StringNotContains(notExpected));
 -        }
 -        finally
 -        {
 -            System.setOut(previousStdout);
 -        }
 -    }
 -
 -    private static final class StringContains extends BaseMatcher<String>
 -    {
 -        private final String expected;
 -
 -        private StringContains(String expected)
 -        {
 -            this.expected = Objects.requireNonNull(expected);
 -        }
 -
 -        public boolean matches(Object o)
 -        {
 -            return o.toString().contains(expected);
 -        }
 -
 -        public void describeTo(Description description)
 -        {
 -            description.appendText("Expected to find '" + expected + "', but did not");
 -        }
 -    }
 -
 -    private static final class StringNotContains extends BaseMatcher<String>
 -    {
 -        private final String notExpected;
 -
 -        private StringNotContains(String expected)
 -        {
 -            this.notExpected = Objects.requireNonNull(expected);
 -        }
 -
 -        public boolean matches(Object o)
 -        {
 -            return !o.toString().contains(notExpected);
 -        }
 -
 -        public void describeTo(Description description)
 -        {
 -            description.appendText("Expected not to find '" + notExpected + "', but did");
 -        }
++        NodeToolResult result = node.nodetoolResult(nodetool);
++        result.asserts().success().stdoutContains(expectedStatus);
++        if (notExpected != null)
++            result.asserts().stdoutNotContains(notExpected);
+     }
+ }
diff --cc test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java
index 731a87e,0000000..0fff01c
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TopologyChangeTest.java
@@@ -1,199 -1,0 +1,196 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.net.InetSocketAddress;
- import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Objects;
++import java.util.concurrent.CopyOnWriteArrayList;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +
 +import com.datastax.driver.core.Host;
 +import com.datastax.driver.core.Session;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy;
 +import org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.Event;
 +
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
 +import static org.apache.cassandra.distributed.impl.INodeProvisionStrategy.Strategy.OneNetworkInterface;
 +import static org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Down;
 +import static org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Remove;
 +import static org.apache.cassandra.distributed.test.TopologyChangeTest.EventStateListener.EventType.Up;
 +import static org.assertj.core.api.Assertions.assertThat;
 +import static org.awaitility.Awaitility.await;
 +
 +@RunWith(Parameterized.class)
 +public class TopologyChangeTest extends TestBaseImpl
 +{
 +    static class EventStateListener implements Host.StateListener
 +    {
 +        enum EventType
 +        {
 +            Add,
 +            Up,
 +            Down,
 +            Remove,
 +        }
 +
 +        static class Event
 +        {
 +            InetSocketAddress host;
 +            EventType type;
 +
 +            Event(EventType type, Host host)
 +            {
 +                this.type = type;
 +                this.host = host.getBroadcastSocketAddress();
 +            }
 +
 +            public Event(EventType type, IInvokableInstance iInvokableInstance)
 +            {
 +                this.type = type;
 +                this.host = iInvokableInstance.broadcastAddress();
 +            }
 +
 +
 +            public String toString()
 +            {
 +                return "Event{" +
 +                       "host='" + host + '\'' +
 +                       ", type=" + type +
 +                       '}';
 +            }
 +
 +            public boolean equals(Object o)
 +            {
 +                if (this == o) return true;
 +                if (o == null || getClass() != o.getClass()) return false;
 +                Event event = (Event) o;
 +                return Objects.equals(host, event.host) &&
 +                       type == event.type;
 +            }
 +
 +            public int hashCode()
 +            {
 +                return Objects.hash(host, type);
 +            }
 +        }
 +
-         private List<Event> events = new ArrayList<>();
++        private final List<Event> events = new CopyOnWriteArrayList<>();
 +
 +        public void onAdd(Host host)
 +        {
 +            events.add(new Event(EventType.Add, host));
 +        }
 +
 +        public void onUp(Host host)
 +        {
 +            events.add(new Event(Up, host));
 +        }
 +
 +        public void onDown(Host host)
 +        {
 +            events.add(new Event(EventType.Down, host));
 +        }
 +
 +        public void onRemove(Host host)
 +        {
 +            events.add(new Event(Remove, host));
 +        }
 +
 +        public void onRegister(com.datastax.driver.core.Cluster cluster)
 +        {
 +        }
 +
 +        public void onUnregister(com.datastax.driver.core.Cluster cluster)
 +        {
 +        }
 +    }
 +
 +    @Parameterized.Parameter(0)
 +    public Strategy strategy;
 +
 +    @Parameterized.Parameters(name = "{index}: provision strategy={0}")
 +    public static Collection<Strategy[]> data()
 +    {
 +        return Arrays.asList(new Strategy[][]{ { MultipleNetworkInterfaces },
 +                                               { OneNetworkInterface }
 +        });
 +    }
 +
 +    @Test
 +    public void testDecommission() throws Throwable
 +    {
-         try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy).withConfig(
-         config -> {
-             config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL);
-         }).start()))
++        try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy)
++                                           .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)).start());
++             com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
++             Session session = cluster.connect())
 +        {
-             final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
-             Session session = cluster.connect();
 +            EventStateListener eventStateListener = new EventStateListener();
 +            session.getCluster().register(eventStateListener);
-             control.get(3).nodetool("disablebinary");
-             control.get(3).nodetool("decommission", "-f");
++
++            control.get(3).nodetoolResult("disablebinary").asserts().success();
++            control.get(3).nodetoolResult("decommission", "-f").asserts().success();
 +            await().atMost(5, TimeUnit.SECONDS)
 +                   .untilAsserted(() -> Assert.assertEquals(2, cluster.getMetadata().getAllHosts().size()));
-             assertThat(eventStateListener.events).containsExactly(new Event(Remove, control.get(3)));
-             session.close();
-             cluster.close();
++            session.getCluster().unregister(eventStateListener);
++            // DOWN UP can also be seen if the jvm is slow and connections are closed; to avoid this make sure to use
++            // containsSequence to check that down/remove happen in this order
++            assertThat(eventStateListener.events).containsSequence(new Event(Down, control.get(3)), new Event(Remove, control.get(3)));
 +        }
 +    }
 +
 +    @Test
 +    public void testRestartNode() throws Throwable
 +    {
-         try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy).withConfig(
-         config -> {
-             config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL);
-         }).start()))
++        try (Cluster control = init(Cluster.build().withNodes(3).withNodeProvisionStrategy(strategy)
++                                           .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL)).start());
++             com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
++             Session session = cluster.connect())
 +        {
-             final com.datastax.driver.core.Cluster cluster = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1").build();
-             Session session = cluster.connect();
 +            EventStateListener eventStateListener = new EventStateListener();
 +            session.getCluster().register(eventStateListener);
 +
 +            control.get(3).shutdown().get();
 +            await().atMost(5, TimeUnit.SECONDS)
 +                   .untilAsserted(() -> Assert.assertEquals(2, cluster.getMetadata().getAllHosts().stream().filter(h -> h.isUp()).count()));
 +
 +            control.get(3).startup();
 +            await().atMost(30, TimeUnit.SECONDS)
 +                   .untilAsserted(() -> Assert.assertEquals(3, cluster.getMetadata().getAllHosts().stream().filter(h -> h.isUp()).count()));
 +
-             assertThat(eventStateListener.events).containsExactly(new Event(Down, control.get(3)),
-                                                                   new Event(Up, control.get(3)));
- 
-             session.close();
-             cluster.close();
++            // DOWN UP can also be seen if the jvm is slow and connections are closed, but make sure it at least happens once
++            // given the node restarts
++            assertThat(eventStateListener.events).containsSequence(new Event(Down, control.get(3)),
++                                                                   new Event(Up, control.get(3)));
 +        }
 +    }
 +}
 +
diff --cc test/resources/byteman/stream_failure.btm
index 0000000,e40f7fe..768c7a3
mode 000000,100644..100644
--- a/test/resources/byteman/stream_failure.btm
+++ b/test/resources/byteman/stream_failure.btm
@@@ -1,0 -1,14 +1,14 @@@
+ #
+ # Inject streaming failure
+ #
+ # Before start streaming files in `StreamSession#prepare()` method,
+ # interrupt streaming by throwing RuntimeException.
+ #
+ RULE inject stream failure
+ CLASS org.apache.cassandra.streaming.StreamSession
 -METHOD prepare
 -AT INVOKE maybeCompleted
++METHOD prepareAck
++AT INVOKE startStreamingFiles
+ IF org.apache.cassandra.distributed.test.BootstrapBinaryDisabledTest$RewriteEnabled.isEnabled()
+ DO
+    throw new java.lang.RuntimeException("Triggering network failure")
 -ENDRULE
++ENDRULE


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org