You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2019/08/20 13:56:06 UTC

[cassandra] branch trunk updated (d60e798 -> b3ed8da)

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from d60e798  Avoid result truncate in decimal operations
     new 8dcaa12  Allow instance class loaders to be garbage collected for inJVM dtest
     new 51c0f6b  Merge branch 'cassandra-2.2' into cassandra-3.0
     new 3df63ed  Merge branch 'cassandra-3.0' into cassandra-3.11
     new b3ed8da  Merge branch 'cassandra-3.11' into trunk

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   2 +
 build.xml                                          |  18 +-
 .../apache/cassandra/batchlog/BatchlogManager.java |   7 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |   6 +-
 .../cassandra/concurrent/SharedExecutorPool.java   |   4 +-
 .../apache/cassandra/concurrent/StageManager.java  |   6 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  16 +-
 .../apache/cassandra/db/commitlog/CommitLog.java   |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  10 +
 .../org/apache/cassandra/hints/HintsCatalog.java   |   5 +-
 .../cassandra/io/sstable/IndexSummaryManager.java  |  11 +
 .../cassandra/io/sstable/format/SSTableReader.java |   9 +-
 .../org/apache/cassandra/net/InboundSockets.java   |  17 +-
 .../org/apache/cassandra/net/MessagingService.java |  15 +-
 .../org/apache/cassandra/net/SocketFactory.java    |   3 +-
 .../org/apache/cassandra/security/SSLFactory.java  |  31 ++-
 .../service/PendingRangeCalculatorService.java     |   5 +-
 .../apache/cassandra/service/StorageService.java   |  23 +-
 .../cassandra/streaming/StreamCoordinator.java     |   8 +
 .../cassandra/streaming/StreamReceiveTask.java     |  13 +
 .../cassandra/streaming/StreamTransferTask.java    |  10 +
 .../streaming/async/StreamingInboundHandler.java   |  36 ++-
 .../org/apache/cassandra/utils/ExecutorUtils.java  |  68 +++++-
 .../org/apache/cassandra/utils/concurrent/Ref.java |   5 +-
 .../cassandra/utils/memory/MemtablePool.java       |   8 +-
 test/conf/logback-dtest.xml                        |  16 +-
 .../org/apache/cassandra/distributed/Cluster.java  |   1 -
 .../cassandra/distributed/UpgradeableCluster.java  |   2 -
 .../apache/cassandra/distributed/api/Feature.java  |   5 +-
 .../cassandra/distributed/api/IInstance.java       |   2 +
 .../cassandra/distributed/api/IInstanceConfig.java |   2 +-
 .../cassandra/distributed/api/IMessageFilters.java |   3 -
 .../distributed/impl/AbstractCluster.java          |  30 ++-
 .../cassandra/distributed/impl/Instance.java       |  77 ++++--
 .../distributed/impl/InstanceClassLoader.java      |   1 +
 .../cassandra/distributed/impl/InstanceConfig.java |  18 +-
 .../distributed/impl/IsolatedExecutor.java         |  47 +++-
 .../test/DistributedReadWritePathTest.java         |   2 +-
 .../distributed/test/DistributedTestBase.java      |  20 ++
 .../distributed/test/GossipSettlesTest.java        |   4 +-
 .../cassandra/distributed/test/RepairTest.java     | 269 +++++++++++----------
 .../distributed/test/ResourceLeakTest.java         | 202 ++++++++++++++++
 .../cassandra/distributed/test/StreamingTest.java  |   2 +-
 .../cassandra/concurrent/SEPExecutorTest.java      |   3 +-
 44 files changed, 777 insertions(+), 266 deletions(-)
 copy src/java/org/apache/cassandra/hints/SerializableHintMessage.java => test/distributed/org/apache/cassandra/distributed/api/Feature.java (90%)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-3.11' into trunk

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit b3ed8da0592fedcfeee5376ac32b7951de54c655
Merge: d60e798 3df63ed
Author: Jon Meredith <jm...@gmail.com>
AuthorDate: Thu Aug 15 14:40:59 2019 -0600

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   2 +
 build.xml                                          |  18 +-
 .../apache/cassandra/batchlog/BatchlogManager.java |   7 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |   6 +-
 .../cassandra/concurrent/SharedExecutorPool.java   |   4 +-
 .../apache/cassandra/concurrent/StageManager.java  |   6 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  16 +-
 .../apache/cassandra/db/commitlog/CommitLog.java   |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |  10 +
 .../org/apache/cassandra/hints/HintsCatalog.java   |   5 +-
 .../cassandra/io/sstable/IndexSummaryManager.java  |  11 +
 .../cassandra/io/sstable/format/SSTableReader.java |   9 +-
 .../org/apache/cassandra/net/InboundSockets.java   |  17 +-
 .../org/apache/cassandra/net/MessagingService.java |  15 +-
 .../org/apache/cassandra/net/SocketFactory.java    |   3 +-
 .../org/apache/cassandra/security/SSLFactory.java  |  31 ++-
 .../service/PendingRangeCalculatorService.java     |   5 +-
 .../apache/cassandra/service/StorageService.java   |  23 +-
 .../cassandra/streaming/StreamCoordinator.java     |   8 +
 .../cassandra/streaming/StreamReceiveTask.java     |  13 +
 .../cassandra/streaming/StreamTransferTask.java    |  10 +
 .../streaming/async/StreamingInboundHandler.java   |  36 ++-
 .../org/apache/cassandra/utils/ExecutorUtils.java  |  68 +++++-
 .../org/apache/cassandra/utils/concurrent/Ref.java |   5 +-
 .../cassandra/utils/memory/MemtablePool.java       |   8 +-
 test/conf/logback-dtest.xml                        |  16 +-
 .../org/apache/cassandra/distributed/Cluster.java  |   1 -
 .../cassandra/distributed/UpgradeableCluster.java  |   2 -
 .../api/{IMessageFilters.java => Feature.java}     |  30 +--
 .../cassandra/distributed/api/IInstance.java       |   2 +
 .../cassandra/distributed/api/IInstanceConfig.java |   2 +-
 .../cassandra/distributed/api/IMessageFilters.java |   3 -
 .../distributed/impl/AbstractCluster.java          |  30 ++-
 .../cassandra/distributed/impl/Instance.java       |  77 ++++--
 .../distributed/impl/InstanceClassLoader.java      |   1 +
 .../cassandra/distributed/impl/InstanceConfig.java |  18 +-
 .../distributed/impl/IsolatedExecutor.java         |  47 +++-
 .../test/DistributedReadWritePathTest.java         |   2 +-
 .../distributed/test/DistributedTestBase.java      |  20 ++
 .../distributed/test/GossipSettlesTest.java        |   4 +-
 .../cassandra/distributed/test/RepairTest.java     | 269 +++++++++++----------
 .../distributed/test/ResourceLeakTest.java         | 202 ++++++++++++++++
 .../cassandra/distributed/test/StreamingTest.java  |   2 +-
 .../cassandra/concurrent/SEPExecutorTest.java      |   3 +-
 44 files changed, 776 insertions(+), 292 deletions(-)

diff --cc CHANGES.txt
index a5d4378,772446d..621d61b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,375 -1,3 +1,377 @@@
 +4.0
 + * Improve readability of Table metrics Virtual tables units (CASSANDRA-15194)
 + * Fix error with non-existent table for nodetool tablehistograms (CASSANDRA-14410)
 + * Avoid result truncation in decimal operations (CASSANDRA-15232)
 + * Catch non-IOException in FileUtils.close to make sure that all resources are closed (CASSANDRA-15225)
 + * Align load column in nodetool status output (CASSANDRA-14787)
 + * CassandraNetworkAuthorizer uses cached roles info (CASSANDRA-15089)
 + * Introduce optional timeouts for idle client sessions (CASSANDRA-11097)
 + * Fix AlterTableStatement dropped type validation order (CASSANDRA-15203)
 + * Update Netty dependencies to latest, clean up SocketFactory (CASSANDRA-15195)
 + * Native Transport - Apply noSpamLogger to ConnectionLimitHandler (CASSANDRA-15167)
 + * Reduce heap pressure during compactions (CASSANDRA-14654)
 + * Support building Cassandra with JDK 11 (CASSANDRA-15108)
 + * Use quilt to patch cassandra.in.sh in Debian packaging (CASSANDRA-14710)
 + * Take sstable references before calculating approximate key count (CASSANDRA-14647)
 + * Restore snapshotting of system keyspaces on version change (CASSANDRA-14412)
 + * Fix AbstractBTreePartition locking in java 11 (CASSANDRA-14607)
 + * SimpleClient should pass connection properties as options (CASSANDRA-15056)
 + * Set repaired data tracking flag on range reads if enabled (CASSANDRA-15019)
 + * Calculate pending ranges for BOOTSTRAP_REPLACE correctly (CASSANDRA-14802)
 + * Make TableCQLHelper reuse the single quote pattern (CASSANDRA-15033)
 + * Add Zstd compressor (CASSANDRA-14482)
 + * Fix IR prepare anti-compaction race (CASSANDRA-15027)
 + * Fix SimpleStrategy option validation (CASSANDRA-15007)
 + * Don't try to cancel 2i compactions when starting anticompaction (CASSANDRA-15024)
 + * Avoid NPE in RepairRunnable.recordFailure (CASSANDRA-15025)
 + * SSL Cert Hot Reloading should check for sanity of the new keystore/truststore before loading it (CASSANDRA-14991)
 + * Avoid leaking threads when failing anticompactions and rate limit anticompactions (CASSANDRA-15002)
 + * Validate token() arguments early instead of throwing NPE at execution (CASSANDRA-14989)
 + * Add a new tool to dump audit logs (CASSANDRA-14885)
 + * Fix generating javadoc with Java11 (CASSANDRA-14988)
 + * Only cancel conflicting compactions when starting anticompactions and sub range compactions (CASSANDRA-14935)
 + * Use a stub IndexRegistry for non-daemon use cases (CASSANDRA-14938)
 + * Don't enable client transports when bootstrap is pending (CASSANDRA-14525)
 + * Make antiCompactGroup throw exception on error and anticompaction non cancellable
 +   again (CASSANDRA-14936)
 + * Catch empty/invalid bounds in SelectStatement (CASSANDRA-14849)
 + * Auto-expand replication_factor for NetworkTopologyStrategy (CASSANDRA-14303)
 + * Transient Replication: support EACH_QUORUM (CASSANDRA-14727)
 + * BufferPool: allocating thread for new chunks should acquire directly (CASSANDRA-14832)
 + * Send correct messaging version in internode messaging handshake's third message (CASSANDRA-14896)
 + * Make Read and Write Latency columns consistent for proxyhistograms and tablehistograms (CASSANDRA-11939)
 + * Make protocol checksum type option case insensitive (CASSANDRA-14716)
 + * Forbid re-adding static columns as regular and vice versa (CASSANDRA-14913)
 + * Audit log allows system keyspaces to be audited via configuration options (CASSANDRA-14498)
 + * Lower default chunk_length_in_kb from 64kb to 16kb (CASSANDRA-13241)
 + * Startup checker should wait for count rather than percentage (CASSANDRA-14297)
 + * Fix incorrect sorting of replicas in SimpleStrategy.calculateNaturalReplicas (CASSANDRA-14862)
 + * Partitioned outbound internode TCP connections can occur when nodes restart (CASSANDRA-14358)
 + * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters (CASSANDRA-14841)
 + * Avoid running query to self through messaging service (CASSANDRA-14807)
 + * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373)
 + * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759)
 + * ReplicaCollection follow-up (CASSANDRA-14726)
 + * Transient node receives full data requests (CASSANDRA-14762)
 + * Enable snapshot artifacts publish (CASSANDRA-12704)
 + * Introduce RangesAtEndpoint.unwrap to simplify StreamSession.addTransferRanges (CASSANDRA-14770)
 + * LOCAL_QUORUM may speculate to non-local nodes, resulting in Timeout instead of Unavailable (CASSANDRA-14735)
 + * Avoid creating empty compaction tasks after truncate (CASSANDRA-14780)
 + * Fail incremental repair prepare phase if it encounters sstables from un-finalized sessions (CASSANDRA-14763)
 + * Add a check for receiving digest response from transient node (CASSANDRA-14750)
 + * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704)
 + * Remove mentions of transient replication from repair path (CASSANDRA-14698)
 + * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720)
 + * Allow transient node to serve as a repair coordinator (CASSANDRA-14693)
 + * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696)
 + * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700)
 + * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145)
 + * Add checksumming to the native protocol (CASSANDRA-13304)
 + * Make AuthCache more easily extendable (CASSANDRA-14662)
 + * Extend RolesCache to include detailed role info (CASSANDRA-14497)
 + * Add fqltool compare (CASSANDRA-14619)
 + * Add fqltool replay (CASSANDRA-14618)
 + * Log keyspace in full query log (CASSANDRA-14656)
 + * Transient Replication and Cheap Quorums (CASSANDRA-14404)
 + * Log server-generated timestamp and nowInSeconds used by queries in FQL (CASSANDRA-14675)
 + * Add diagnostic events for read repairs (CASSANDRA-14668)
 + * Use consistent nowInSeconds and timestamps values within a request (CASSANDRA-14671)
 + * Add sampler for query time and expose with nodetool (CASSANDRA-14436)
 + * Clean up Message.Request implementations (CASSANDRA-14677)
 + * Disable old native protocol versions on demand (CASANDRA-14659)
 + * Allow specifying now-in-seconds in native protocol (CASSANDRA-14664)
 + * Improve BTree build performance by avoiding data copy (CASSANDRA-9989)
 + * Make monotonic read / read repair configurable (CASSANDRA-14635)
 + * Refactor CompactionStrategyManager (CASSANDRA-14621)
 + * Flush netty client messages immediately by default (CASSANDRA-13651)
 + * Improve read repair blocking behavior (CASSANDRA-10726)
 + * Add a virtual table to expose settings (CASSANDRA-14573)
 + * Fix up chunk cache handling of metrics (CASSANDRA-14628)
 + * Extend IAuthenticator to accept peer SSL certificates (CASSANDRA-14652)
 + * Incomplete handling of exceptions when decoding incoming messages (CASSANDRA-14574)
 + * Add diagnostic events for user audit logging (CASSANDRA-13668)
 + * Allow retrieving diagnostic events via JMX (CASSANDRA-14435)
 + * Add base classes for diagnostic events (CASSANDRA-13457)
 + * Clear view system metadata when dropping keyspace (CASSANDRA-14646)
 + * Allocate ReentrantLock on-demand in java11 AtomicBTreePartitionerBase (CASSANDRA-14637)
 + * Make all existing virtual tables use LocalPartitioner (CASSANDRA-14640)
 + * Revert 4.0 GC alg back to CMS (CASANDRA-14636)
 + * Remove hardcoded java11 jvm args in idea workspace files (CASSANDRA-14627)
 + * Update netty to 4.1.128 (CASSANDRA-14633)
 + * Add a virtual table to expose thread pools (CASSANDRA-14523)
 + * Add a virtual table to expose caches (CASSANDRA-14538, CASSANDRA-14626)
 + * Fix toDate function for timestamp arguments (CASSANDRA-14502)
 + * Revert running dtests by default in circleci (CASSANDRA-14614)
 + * Stream entire SSTables when possible (CASSANDRA-14556)
 + * Cell reconciliation should not depend on nowInSec (CASSANDRA-14592)
 + * Add experimental support for Java 11 (CASSANDRA-9608)
 + * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580)
 + * Improve logging in MessageInHandler's constructor (CASSANDRA-14576)
 + * Set broadcast address in internode messaging handshake (CASSANDRA-14579)
 + * Wait for schema agreement prior to building MVs (CASSANDRA-14571)
 + * Make all DDL statements idempotent and not dependent on global state (CASSANDRA-13426)
 + * Bump the hints messaging version to match the current one (CASSANDRA-14536)
 + * OffsetAwareConfigurationLoader doesn't set ssl storage port causing bind errors in CircleCI (CASSANDRA-14546)
 + * Report why native_transport_port fails to bind (CASSANDRA-14544)
 + * Optimize internode messaging protocol (CASSANDRA-14485)
 + * Internode messaging handshake sends wrong messaging version number (CASSANDRA-14540)
 + * Add a virtual table to expose active client connections (CASSANDRA-14458)
 + * Clean up and refactor client metrics (CASSANDRA-14524)
 + * Nodetool import row cache invalidation races with adding sstables to tracker (CASSANDRA-14529)
 + * Fix assertions in LWTs after TableMetadata was made immutable (CASSANDRA-14356)
 + * Abort compactions quicker (CASSANDRA-14397)
 + * Support light-weight transactions in cassandra-stress (CASSANDRA-13529)
 + * Make AsyncOneResponse use the correct timeout (CASSANDRA-14509)
 + * Add option to sanity check tombstones on reads/compactions (CASSANDRA-14467)
 + * Add a virtual table to expose all running sstable tasks (CASSANDRA-14457)
 + * Let nodetool import take a list of directories (CASSANDRA-14442)
 + * Avoid unneeded memory allocations / cpu for disabled log levels (CASSANDRA-14488)
 + * Implement virtual keyspace interface (CASSANDRA-7622)
 + * nodetool import cleanup and improvements (CASSANDRA-14417)
 + * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
 + * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)
 + * Audit logging for database activity (CASSANDRA-12151)
 + * Clean up build artifacts in docs container (CASSANDRA-14432)
 + * Minor network authz improvements (Cassandra-14413)
 + * Automatic sstable upgrades (CASSANDRA-14197)
 + * Replace deprecated junit.framework.Assert usages with org.junit.Assert (CASSANDRA-14431)
 + * Cassandra-stress throws NPE if insert section isn't specified in user profile (CASSSANDRA-14426)
 + * List clients by protocol versions `nodetool clientstats --by-protocol` (CASSANDRA-14335)
 + * Improve LatencyMetrics performance by reducing write path processing (CASSANDRA-14281)
 + * Add network authz (CASSANDRA-13985)
 + * Use the correct IP/Port for Streaming when localAddress is left unbound (CASSANDRA-14389)
 + * nodetool listsnapshots is missing local system keyspace snapshots (CASSANDRA-14381)
 + * Remove StreamCoordinator.streamExecutor thread pool (CASSANDRA-14402)
 + * Rename nodetool --with-port to --print-port to disambiguate from --port (CASSANDRA-14392)
 + * Client TOPOLOGY_CHANGE messages have wrong port. (CASSANDRA-14398)
 + * Add ability to load new SSTables from a separate directory (CASSANDRA-6719)
 + * Eliminate background repair and probablistic read_repair_chance table options
 +   (CASSANDRA-13910)
 + * Bind to correct local address in 4.0 streaming (CASSANDRA-14362)
 + * Use standard Amazon naming for datacenter and rack in Ec2Snitch (CASSANDRA-7839)
 + * Fix junit failure for SSTableReaderTest (CASSANDRA-14387)
 + * Abstract write path for pluggable storage (CASSANDRA-14118)
 + * nodetool describecluster should be more informative (CASSANDRA-13853)
 + * Compaction performance improvements (CASSANDRA-14261) 
 + * Refactor Pair usage to avoid boxing ints/longs (CASSANDRA-14260)
 + * Add options to nodetool tablestats to sort and limit output (CASSANDRA-13889)
 + * Rename internals to reflect CQL vocabulary (CASSANDRA-14354)
 + * Add support for hybrid MIN(), MAX() speculative retry policies
 +   (CASSANDRA-14293, CASSANDRA-14338, CASSANDRA-14352)
 + * Fix some regressions caused by 14058 (CASSANDRA-14353)
 + * Abstract repair for pluggable storage (CASSANDRA-14116)
 + * Add meaningful toString() impls (CASSANDRA-13653)
 + * Add sstableloader option to accept target keyspace name (CASSANDRA-13884)
 + * Move processing of EchoMessage response to gossip stage (CASSANDRA-13713)
 + * Add coordinator write metric per CF (CASSANDRA-14232)
 + * Correct and clarify SSLFactory.getSslContext method and call sites (CASSANDRA-14314)
 + * Handle static and partition deletion properly on ThrottledUnfilteredIterator (CASSANDRA-14315)
 + * NodeTool clientstats should show SSL Cipher (CASSANDRA-14322)
 + * Add ability to specify driver name and version (CASSANDRA-14275)
 + * Abstract streaming for pluggable storage (CASSANDRA-14115)
 + * Forced incremental repairs should promote sstables if they can (CASSANDRA-14294)
 + * Use Murmur3 for validation compactions (CASSANDRA-14002)
 + * Comma at the end of the seed list is interpretated as localhost (CASSANDRA-14285)
 + * Refactor read executor and response resolver, abstract read repair (CASSANDRA-14058)
 + * Add optional startup delay to wait until peers are ready (CASSANDRA-13993)
 + * Add a few options to nodetool verify (CASSANDRA-14201)
 + * CVE-2017-5929 Security vulnerability and redefine default log rotation policy (CASSANDRA-14183)
 + * Use JVM default SSL validation algorithm instead of custom default (CASSANDRA-13259)
 + * Better document in code InetAddressAndPort usage post 7544, incorporate port into UUIDGen node (CASSANDRA-14226)
 + * Fix sstablemetadata date string for minLocalDeletionTime (CASSANDRA-14132)
 + * Make it possible to change neverPurgeTombstones during runtime (CASSANDRA-14214)
 + * Remove GossipDigestSynVerbHandler#doSort() (CASSANDRA-14174)
 + * Add nodetool clientlist (CASSANDRA-13665)
 + * Revert ProtocolVersion changes from CASSANDRA-7544 (CASSANDRA-14211)
 + * Non-disruptive seed node list reload (CASSANDRA-14190)
 + * Nodetool tablehistograms to print statics for all the tables (CASSANDRA-14185)
 + * Migrate dtests to use pytest and python3 (CASSANDRA-14134)
 + * Allow storage port to be configurable per node (CASSANDRA-7544)
 + * Make sub-range selection for non-frozen collections return null instead of empty (CASSANDRA-14182)
 + * BloomFilter serialization format should not change byte ordering (CASSANDRA-9067)
 + * Remove unused on-heap BloomFilter implementation (CASSANDRA-14152)
 + * Delete temp test files on exit (CASSANDRA-14153)
 + * Make PartitionUpdate and Mutation immutable (CASSANDRA-13867)
 + * Fix CommitLogReplayer exception for CDC data (CASSANDRA-14066)
 + * Fix cassandra-stress startup failure (CASSANDRA-14106)
 + * Remove initialDirectories from CFS (CASSANDRA-13928)
 + * Fix trivial log format error (CASSANDRA-14015)
 + * Allow sstabledump to do a json object per partition (CASSANDRA-13848)
 + * Add option to optimise merkle tree comparison across replicas (CASSANDRA-3200)
 + * Remove unused and deprecated methods from AbstractCompactionStrategy (CASSANDRA-14081)
 + * Fix Distribution.average in cassandra-stress (CASSANDRA-14090)
 + * Support a means of logging all queries as they were invoked (CASSANDRA-13983)
 + * Presize collections (CASSANDRA-13760)
 + * Add GroupCommitLogService (CASSANDRA-13530)
 + * Parallelize initial materialized view build (CASSANDRA-12245)
 + * Fix flaky SecondaryIndexManagerTest.assert[Not]MarkedAsBuilt (CASSANDRA-13965)
 + * Make LWTs send resultset metadata on every request (CASSANDRA-13992)
 + * Fix flaky indexWithFailedInitializationIsNotQueryableAfterPartialRebuild (CASSANDRA-13963)
 + * Introduce leaf-only iterator (CASSANDRA-9988)
 + * Upgrade Guava to 23.3 and Airline to 0.8 (CASSANDRA-13997)
 + * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
 + * Refactoring to specialised functional interfaces (CASSANDRA-13982)
 + * Speculative retry should allow more friendly params (CASSANDRA-13876)
 + * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)
 + * Replace usages of MessageDigest with Guava's Hasher (CASSANDRA-13291)
 + * Add nodetool cmd to print hinted handoff window (CASSANDRA-13728)
 + * Fix some alerts raised by static analysis (CASSANDRA-13799)
 + * Checksum sstable metadata (CASSANDRA-13321, CASSANDRA-13593)
 + * Add result set metadata to prepared statement MD5 hash calculation (CASSANDRA-10786)
 + * Refactor GcCompactionTest to avoid boxing (CASSANDRA-13941)
 + * Expose recent histograms in JmxHistograms (CASSANDRA-13642)
 + * Fix buffer length comparison when decompressing in netty-based streaming (CASSANDRA-13899)
 + * Properly close StreamCompressionInputStream to release any ByteBuf (CASSANDRA-13906)
 + * Add SERIAL and LOCAL_SERIAL support for cassandra-stress (CASSANDRA-13925)
 + * LCS needlessly checks for L0 STCS candidates multiple times (CASSANDRA-12961)
 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905)
 + * Update lz4 to 1.4.0 (CASSANDRA-13741)
 + * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862)
 + * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299)
 + * Use compaction threshold for STCS in L0 (CASSANDRA-13861)
 + * Fix problem with min_compress_ratio: 1 and disallow ratio < 1 (CASSANDRA-13703)
 + * Add extra information to SASI timeout exception (CASSANDRA-13677)
 + * Add incremental repair support for --hosts, --force, and subrange repair (CASSANDRA-13818)
 + * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
 + * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
 + * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
 + * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)
 + * Make netty EventLoopGroups daemon threads (CASSANDRA-13837)
 + * Race condition when closing stream sessions (CASSANDRA-13852)
 + * NettyFactoryTest is failing in trunk on macOS (CASSANDRA-13831)
 + * Allow changing log levels via nodetool for related classes (CASSANDRA-12696)
 + * Add stress profile yaml with LWT (CASSANDRA-7960)
 + * Reduce memory copies and object creations when acting on ByteBufs (CASSANDRA-13789)
 + * Simplify mx4j configuration (Cassandra-13578)
 + * Fix trigger example on 4.0 (CASSANDRA-13796)
 + * Force minumum timeout value (CASSANDRA-9375)
 + * Use netty for streaming (CASSANDRA-12229)
 + * Use netty for internode messaging (CASSANDRA-8457)
 + * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774)
 + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758)
 + * Fix pending repair manager index out of bounds check (CASSANDRA-13769)
 + * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576)
 + * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
 + * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
 + * Fix race / ref leak in anticompaction (CASSANDRA-13688)
 + * Expose tasks queue length via JMX (CASSANDRA-12758)
 + * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)
 + * Enable ppc64le runtime as unsupported architecture (CASSANDRA-13615)
 + * Improve sstablemetadata output (CASSANDRA-11483)
 + * Support for migrating legacy users to roles has been dropped (CASSANDRA-13371)
 + * Introduce error metrics for repair (CASSANDRA-13387)
 + * Refactoring to primitive functional interfaces in AuthCache (CASSANDRA-13732)
 + * Update metrics to 3.1.5 (CASSANDRA-13648)
 + * batch_size_warn_threshold_in_kb can now be set at runtime (CASSANDRA-13699)
 + * Avoid always rebuilding secondary indexes at startup (CASSANDRA-13725)
 + * Upgrade JMH from 1.13 to 1.19 (CASSANDRA-13727)
 + * Upgrade SLF4J from 1.7.7 to 1.7.25 (CASSANDRA-12996)
 + * Default for start_native_transport now true if not set in config (CASSANDRA-13656)
 + * Don't add localhost to the graph when calculating where to stream from (CASSANDRA-13583)
 + * Make CDC availability more deterministic via hard-linking (CASSANDRA-12148)
 + * Allow skipping equality-restricted clustering columns in ORDER BY clause (CASSANDRA-10271)
 + * Use common nowInSec for validation compactions (CASSANDRA-13671)
 + * Improve handling of IR prepare failures (CASSANDRA-13672)
 + * Send IR coordinator messages synchronously (CASSANDRA-13673)
 + * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
 + * Fix column filter creation for wildcard queries (CASSANDRA-13650)
 + * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614)
 + * fix race condition in PendingRepairManager (CASSANDRA-13659)
 + * Allow noop incremental repair state transitions (CASSANDRA-13658)
 + * Run repair with down replicas (CASSANDRA-10446)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Added started & completed repair metrics (CASSANDRA-13598)
 + * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
 + * Improve calculation of available disk space for compaction (CASSANDRA-13068)
 + * Change the accessibility of RowCacheSerializer for third party row cache plugins (CASSANDRA-13579)
 + * Allow sub-range repairs for a preview of repaired data (CASSANDRA-13570)
 + * NPE in IR cleanup when columnfamily has no sstables (CASSANDRA-13585)
 + * Fix Randomness of stress values (CASSANDRA-12744)
 + * Allow selecting Map values and Set elements (CASSANDRA-7396)
 + * Fast and garbage-free Streaming Histogram (CASSANDRA-13444)
 + * Update repairTime for keyspaces on completion (CASSANDRA-13539)
 + * Add configurable upper bound for validation executor threads (CASSANDRA-13521)
 + * Bring back maxHintTTL propery (CASSANDRA-12982)
 + * Add testing guidelines (CASSANDRA-13497)
 + * Add more repair metrics (CASSANDRA-13531)
 + * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
 + * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
 + * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
 + * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)
 + * Fix incorrect cqlsh results when selecting same columns multiple times (CASSANDRA-13262)
 + * Fix WriteResponseHandlerTest is sensitive to test execution order (CASSANDRA-13421)
 + * Improve incremental repair logging (CASSANDRA-13468)
 + * Start compaction when incremental repair finishes (CASSANDRA-13454)
 + * Add repair streaming preview (CASSANDRA-13257)
 + * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430)
 + * Change protocol to allow sending key space independent of query string (CASSANDRA-10145)
 + * Make gc_log and gc_warn settable at runtime (CASSANDRA-12661)
 + * Take number of files in L0 in account when estimating remaining compaction tasks (CASSANDRA-13354)
 + * Skip building views during base table streams on range movements (CASSANDRA-13065)
 + * Improve error messages for +/- operations on maps and tuples (CASSANDRA-13197)
 + * Remove deprecated repair JMX APIs (CASSANDRA-11530)
 + * Fix version check to enable streaming keep-alive (CASSANDRA-12929)
 + * Make it possible to monitor an ideal consistency level separate from actual consistency level (CASSANDRA-13289)
 + * Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
 + * Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
 + * Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
 + * Upgrade snappy-java to 1.1.2.6 (CASSANDRA-13336)
 + * Incremental repair not streaming correct sstables (CASSANDRA-13328)
 + * Upgrade the jna version to 4.3.0 (CASSANDRA-13300)
 + * Add the currentTimestamp, currentDate, currentTime and currentTimeUUID functions (CASSANDRA-13132)
 + * Remove config option index_interval (CASSANDRA-10671)
 + * Reduce lock contention for collection types and serializers (CASSANDRA-13271)
 + * Make it possible to override MessagingService.Verb ids (CASSANDRA-13283)
 + * Avoid synchronized on prepareForRepair in ActiveRepairService (CASSANDRA-9292)
 + * Adds the ability to use uncompressed chunks in compressed files (CASSANDRA-10520)
 + * Don't flush sstables when streaming for incremental repair (CASSANDRA-13226)
 + * Remove unused method (CASSANDRA-13227)
 + * Fix minor bugs related to #9143 (CASSANDRA-13217)
 + * Output warning if user increases RF (CASSANDRA-13079)
 + * Remove pre-3.0 streaming compatibility code for 4.0 (CASSANDRA-13081)
 + * Add support for + and - operations on dates (CASSANDRA-11936)
 + * Fix consistency of incrementally repaired data (CASSANDRA-9143)
 + * Increase commitlog version (CASSANDRA-13161)
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
 + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
 + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
 + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
 + * Add histogram for delay to deliver hints (CASSANDRA-13234)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Changing `max_hint_window_in_ms` at runtime (CASSANDRA-11720)
 + * Trivial format error in StorageProxy (CASSANDRA-13551)
 + * Nodetool repair can hang forever if we lose the notification for the repair completing/failing (CASSANDRA-13480)
 + * Anticompaction can cause noisy log messages (CASSANDRA-13684)
 + * Switch to client init for sstabledump (CASSANDRA-13683)
 + * CQLSH: Don't pause when capturing data (CASSANDRA-13743)
 + * nodetool clearsnapshot requires --all to clear all snapshots (CASSANDRA-13391)
 + * Correctly count range tombstones in traces and tombstone thresholds (CASSANDRA-8527)
 + * cqlshrc.sample uses incorrect option for time formatting (CASSANDRA-14243)
++ * Multi-version in-JVM dtests (CASSANDRA-14937)
++ * Allow instance class loaders to be garbage collected for inJVM dtest (CASSANDRA-15170)
 +
 +
  3.11.5
   * Make sure user defined compaction transactions are always closed (CASSANDRA-15123)
   * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config (CASSANDRA-14305)
diff --cc src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index d0d3411,3c0ad56..f140332
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@@ -31,11 -33,16 +31,12 @@@ import java.util.concurrent.ExecutionEx
  import java.util.concurrent.Future;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
 -import java.util.concurrent.ThreadLocalRandom;
  import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.TimeoutException;
  
  import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.ArrayListMultimap;
 +import com.google.common.collect.Collections2;
  import com.google.common.collect.Iterables;
 -import com.google.common.collect.ListMultimap;
 -import com.google.common.collect.Lists;
 -import com.google.common.collect.Multimap;
  import com.google.common.util.concurrent.RateLimiter;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -60,18 -67,12 +61,19 @@@ import org.apache.cassandra.hints.Hint
  import org.apache.cassandra.hints.HintsService;
  import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataOutputBuffer;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.locator.ReplicaLayout;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.locator.Replicas;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.net.MessageFlag;
  import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.schema.TableId;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.service.WriteResponseHandler;
+ import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MBeanWrapper;
  import org.apache.cassandra.utils.UUIDGen;
@@@ -114,13 -111,12 +116,12 @@@ public class BatchlogManager implement
          batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
                                               StorageService.RING_DELAY,
                                               REPLAY_INTERVAL,
 -                                             TimeUnit.MILLISECONDS);
 +                                             MILLISECONDS);
      }
  
-     public void shutdown() throws InterruptedException
+     public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         batchlogTasks.shutdown();
-         batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+         ExecutorUtils.shutdownAndWait(timeout, unit, batchlogTasks);
      }
  
      public static void remove(UUID id)
diff --cc src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
index 90ceca5,3da4569..c549c4d
--- a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@@ -23,10 -22,9 +23,12 @@@ import java.util.concurrent.TimeUnit
  import java.util.concurrent.TimeoutException;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableList;
 +
 +import org.apache.cassandra.utils.ExecutorUtils;
  
+ import org.apache.cassandra.utils.ExecutorUtils;
+ 
  /**
   * Centralized location for shared executors
   */
diff --cc src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 53792ec,f309b46..3388ea4
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@@ -115,9 -115,10 +115,9 @@@ public class SharedExecutorPoo
          return executor;
      }
  
-     public void shutdown(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 -    public synchronized void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException
++    public synchronized void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
          shuttingDown = true;
 -        List<SEPExecutor> executors = new ArrayList<>(this.executors);
          for (SEPExecutor executor : executors)
              executor.shutdownNow();
  
diff --cc src/java/org/apache/cassandra/concurrent/StageManager.java
index 46e8cea,857c5b7..b0d34ae
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@@ -27,7 -25,7 +27,8 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.Verb;
+ import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
  
  import static org.apache.cassandra.config.DatabaseDescriptor.*;
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 981bc05,89fb271..2ef5a76
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -87,9 -83,8 +87,9 @@@ import org.apache.cassandra.utils.memor
  import org.json.simple.JSONArray;
  import org.json.simple.JSONObject;
  
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
- import static org.apache.cassandra.utils.ExecutorUtils.*;
  import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+ import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
  import static org.apache.cassandra.utils.Throwables.maybeFail;
  
  public class ColumnFamilyStore implements ColumnFamilyStoreMBean
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 062abe0,a205140..71f4462
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -39,11 -34,8 +39,12 @@@ import com.google.common.collect.Immuta
  import com.google.common.util.concurrent.ListenableFutureTask;
  import com.google.common.util.concurrent.Uninterruptibles;
  
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.NoPayload;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.utils.CassandraVersion;
  import io.netty.util.concurrent.FastThreadLocal;
+ import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.MBeanWrapper;
  import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.Pair;
@@@ -55,18 -47,19 +56,20 @@@ import org.apache.cassandra.concurrent.
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.net.IAsyncCallback;
 -import org.apache.cassandra.net.MessageIn;
 -import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.RequestCallback;
 +import org.apache.cassandra.net.Message;
  import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.utils.CassandraVersion;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.JVMStabilityInspector;
+ import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+ import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
  
 +import static org.apache.cassandra.net.NoPayload.noPayload;
 +import static org.apache.cassandra.net.Verb.ECHO_REQ;
 +import static org.apache.cassandra.net.Verb.GOSSIP_DIGEST_SYN;
 +
  /**
   * This module is responsible for Gossiping information for the local endpoint. This abstraction
   * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
@@@ -1991,55 -1811,10 +1994,62 @@@ public class Gossiper implements IFailu
              logger.info("No gossip backlog; proceeding");
      }
  
 +    /**
 +     * Blockingly wait for all live nodes to agree on the current schema version.
 +     *
 +     * @param maxWait maximum time to wait for schema agreement
 +     * @param unit TimeUnit of maxWait
 +     * @return true if agreement was reached, false if not
 +     */
 +    public boolean waitForSchemaAgreement(long maxWait, TimeUnit unit, BooleanSupplier abortCondition)
 +    {
 +        int waited = 0;
 +        int toWait = 50;
 +
 +        Set<InetAddressAndPort> members = getLiveTokenOwners();
 +
 +        while (true)
 +        {
 +            if (nodesAgreeOnSchema(members))
 +                return true;
 +
 +            if (waited >= unit.toMillis(maxWait) || abortCondition.getAsBoolean())
 +                return false;
 +
 +            Uninterruptibles.sleepUninterruptibly(toWait, TimeUnit.MILLISECONDS);
 +            waited += toWait;
 +            toWait = Math.min(1000, toWait * 2);
 +        }
 +    }
 +
 +    public boolean haveMajorVersion3Nodes()
 +    {
 +        return haveMajorVersion3NodesMemoized.get();
 +    }
 +
 +    private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes)
 +    {
 +        UUID expectedVersion = null;
 +
 +        for (InetAddressAndPort node : nodes)
 +        {
 +            EndpointState state = getEndpointStateForEndpoint(node);
 +            UUID remoteVersion = state.getSchemaVersion();
 +
 +            if (null == expectedVersion)
 +                expectedVersion = remoteVersion;
 +
 +            if (null == expectedVersion || !expectedVersion.equals(remoteVersion))
 +                return false;
 +        }
 +
 +        return true;
 +    }
++
+     @VisibleForTesting
+     public void stopShutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+     {
+         stop();
+         ExecutorUtils.shutdownAndWait(timeout, unit, executor);
+     }
  }
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 2d58cf8,ae79217..1f4059a
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -23,8 -23,10 +23,9 @@@ import java.util.HashMap
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
 -import java.util.UUID;
  import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.TimeoutException;
  
  import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.ImmutableSet;
@@@ -43,7 -45,7 +44,8 @@@ import org.apache.cassandra.db.lifecycl
  import org.apache.cassandra.db.lifecycle.SSTableSet;
  import org.apache.cassandra.db.lifecycle.View;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.TableId;
+ import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MBeanWrapper;
  import org.apache.cassandra.utils.Pair;
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index b545d51,06f0072..aa4ca55
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -2396,113 -2371,10 +2396,108 @@@ public abstract class SSTableReader ext
  
      }
  
 +    public static class PartitionPositionBounds
 +    {
 +        public final long lowerPosition;
 +        public final long upperPosition;
 +
 +        public PartitionPositionBounds(long lower, long upper)
 +        {
 +            this.lowerPosition = lower;
 +            this.upperPosition = upper;
 +        }
 +
 +        @Override
 +        public final int hashCode()
 +        {
 +            int hashCode = (int) lowerPosition ^ (int) (lowerPosition >>> 32);
 +            return 31 * (hashCode ^ (int) ((int) upperPosition ^  (upperPosition >>> 32)));
 +        }
 +
 +        @Override
 +        public final boolean equals(Object o)
 +        {
 +            if(!(o instanceof PartitionPositionBounds))
 +                return false;
 +            PartitionPositionBounds that = (PartitionPositionBounds)o;
 +            return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition;
 +        }
 +    }
 +
 +    public static class IndexesBounds
 +    {
 +        public final int lowerPosition;
 +        public final int upperPosition;
 +
 +        public IndexesBounds(int lower, int upper)
 +        {
 +            this.lowerPosition = lower;
 +            this.upperPosition = upper;
 +        }
 +
 +        @Override
 +        public final int hashCode()
 +        {
 +            return 31 * lowerPosition * upperPosition;
 +        }
 +
 +        @Override
 +        public final boolean equals(Object o)
 +        {
 +            if (!(o instanceof IndexesBounds))
 +                return false;
 +            IndexesBounds that = (IndexesBounds) o;
 +            return lowerPosition == that.lowerPosition && upperPosition == that.upperPosition;
 +        }
 +    }
 +
 +    /**
 +     * Moves the sstable in oldDescriptor to a new place (with generation etc) in newDescriptor.
 +     *
 +     * All components given will be moved/renamed
 +     */
 +    public static SSTableReader moveAndOpenSSTable(ColumnFamilyStore cfs, Descriptor oldDescriptor, Descriptor newDescriptor, Set<Component> components)
 +    {
 +        if (!oldDescriptor.isCompatible())
 +            throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
 +                                                     oldDescriptor.getFormat().getLatestVersion(),
 +                                                     oldDescriptor));
 +
 +        boolean isLive = cfs.getLiveSSTables().stream().anyMatch(r -> r.descriptor.equals(newDescriptor)
 +                                                                      || r.descriptor.equals(oldDescriptor));
 +        if (isLive)
 +        {
 +            String message = String.format("Can't move and open a file that is already in use in the table %s -> %s", oldDescriptor, newDescriptor);
 +            logger.error(message);
 +            throw new RuntimeException(message);
 +        }
 +        if (new File(newDescriptor.filenameFor(Component.DATA)).exists())
 +        {
 +            String msg = String.format("File %s already exists, can't move the file there", newDescriptor.filenameFor(Component.DATA));
 +            logger.error(msg);
 +            throw new RuntimeException(msg);
 +        }
 +
 +        logger.info("Renaming new SSTable {} to {}", oldDescriptor, newDescriptor);
 +        SSTableWriter.rename(oldDescriptor, newDescriptor, components);
 +
 +        SSTableReader reader;
 +        try
 +        {
 +            reader = SSTableReader.open(newDescriptor, components, cfs.metadata);
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Aborting import of sstables. {} was corrupt", newDescriptor);
 +            throw new RuntimeException(newDescriptor + " is corrupt, can't import", t);
 +        }
 +        return reader;
 +    }
 +
      public static void shutdownBlocking(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         if (syncExecutor != null)
-         {
-             syncExecutor.shutdownNow();
-             syncExecutor.awaitTermination(timeout, unit);
-             if (!syncExecutor.isTerminated())
-                 throw new TimeoutException();
-         }
+ 
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, syncExecutor);
          resetTidying();
      }
  }
diff --cc src/java/org/apache/cassandra/net/InboundSockets.java
index ad5c96f,0000000..8f74eaa
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/InboundSockets.java
+++ b/src/java/org/apache/cassandra/net/InboundSockets.java
@@@ -1,235 -1,0 +1,244 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.util.ArrayList;
 +import java.util.List;
++import java.util.concurrent.ExecutorService;
 +import java.util.function.Consumer;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ImmutableList;
 +
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelFuture;
 +import io.netty.channel.ChannelPipeline;
 +import io.netty.channel.group.ChannelGroup;
 +import io.netty.channel.group.DefaultChannelGroup;
 +import io.netty.util.concurrent.DefaultEventExecutor;
 +import io.netty.util.concurrent.Future;
 +import io.netty.util.concurrent.GlobalEventExecutor;
 +import io.netty.util.concurrent.PromiseNotifier;
 +import io.netty.util.concurrent.SucceededFuture;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +class InboundSockets
 +{
 +    /**
 +     * A simple struct to wrap up the components needed for each listening socket.
 +     */
 +    @VisibleForTesting
 +    static class InboundSocket
 +    {
 +        public final InboundConnectionSettings settings;
 +
 +        /**
 +         * The base {@link Channel} that is doing the socket listen/accept.
 +         * Null only until open() is invoked and {@link #binding} has yet to complete.
 +         */
 +        private volatile Channel listen;
 +        /**
 +         * Once open() is invoked, this holds the future result of opening the socket,
 +         * so that its completion can be waited on. Once complete, it sets itself to null.
 +         */
 +        private volatile ChannelFuture binding;
 +
 +        // purely to prevent close racing with open
 +        private boolean closedWithoutOpening;
 +
 +        /**
 +         * A group of the open, inbound {@link Channel}s connected to this node. This is mostly interesting so that all of
 +         * the inbound connections/channels can be closed when the listening socket itself is being closed.
 +         */
 +        private final ChannelGroup connections;
 +        private final DefaultEventExecutor executor;
 +
 +        private InboundSocket(InboundConnectionSettings settings)
 +        {
 +            this.settings = settings;
 +            this.executor = new DefaultEventExecutor(new NamedThreadFactory("Listen-" + settings.bindAddress));
 +            this.connections = new DefaultChannelGroup(settings.bindAddress.toString(), executor);
 +        }
 +
 +        private Future<Void> open()
 +        {
 +            return open(pipeline -> {});
 +        }
 +
 +        private Future<Void> open(Consumer<ChannelPipeline> pipelineInjector)
 +        {
 +            synchronized (this)
 +            {
 +                if (listen != null)
 +                    return new SucceededFuture<>(GlobalEventExecutor.INSTANCE, null);
 +                if (binding != null)
 +                    return binding;
 +                if (closedWithoutOpening)
 +                    throw new IllegalStateException();
 +                binding = InboundConnectionInitiator.bind(settings, connections, pipelineInjector);
 +            }
 +
 +            return binding.addListener(ignore -> {
 +                synchronized (this)
 +                {
 +                    if (binding.isSuccess())
 +                        listen = binding.channel();
 +                    binding = null;
 +                }
 +            });
 +        }
 +
 +        /**
 +         * Close this socket and any connections created on it. Once closed, this socket may not be re-opened.
 +         *
 +         * This may not execute synchronously, so a Future is returned encapsulating its result.
++         * @param shutdownExecutors
 +         */
-         private Future<Void> close()
++        private Future<Void> close(Consumer<? super ExecutorService> shutdownExecutors)
 +        {
 +            AsyncPromise<Void> done = AsyncPromise.uncancellable(GlobalEventExecutor.INSTANCE);
 +
 +            Runnable close = () -> {
 +                List<Future<Void>> closing = new ArrayList<>();
 +                if (listen != null)
 +                    closing.add(listen.close());
 +                closing.add(connections.close());
 +                new FutureCombiner(closing)
-                        .addListener(future -> executor.shutdownGracefully())
++                       .addListener(future -> {
++                           executor.shutdownGracefully();
++                           shutdownExecutors.accept(executor);
++                       })
 +                       .addListener(new PromiseNotifier<>(done));
 +            };
 +
 +            synchronized (this)
 +            {
 +                if (listen == null && binding == null)
 +                {
 +                    closedWithoutOpening = true;
 +                    return new SucceededFuture<>(GlobalEventExecutor.INSTANCE, null);
 +                }
 +
 +                if (listen != null)
 +                {
 +                    close.run();
 +                }
 +                else
 +                {
 +                    binding.cancel(true);
 +                    binding.addListener(future -> close.run());
 +                }
 +
 +                return done;
 +            }
 +        }
 +
 +        public boolean isOpen()
 +        {
 +            return listen != null && listen.isOpen();
 +        }
 +    }
 +
 +    private final List<InboundSocket> sockets;
 +
 +    InboundSockets(InboundConnectionSettings template)
 +    {
 +        this(withDefaultBindAddresses(template));
 +    }
 +
 +    InboundSockets(List<InboundConnectionSettings> templates)
 +    {
 +        this.sockets = bindings(templates);
 +    }
 +
 +    private static List<InboundConnectionSettings> withDefaultBindAddresses(InboundConnectionSettings template)
 +    {
 +        ImmutableList.Builder<InboundConnectionSettings> templates = ImmutableList.builder();
 +        templates.add(template.withBindAddress(FBUtilities.getLocalAddressAndPort()));
 +        if (shouldListenOnBroadcastAddress())
 +            templates.add(template.withBindAddress(FBUtilities.getBroadcastAddressAndPort()));
 +        return templates.build();
 +    }
 +
 +    private static List<InboundSocket> bindings(List<InboundConnectionSettings> templates)
 +    {
 +        ImmutableList.Builder<InboundSocket> sockets = ImmutableList.builder();
 +        for (InboundConnectionSettings template : templates)
 +            addBindings(template, sockets);
 +        return sockets.build();
 +    }
 +
 +    private static void addBindings(InboundConnectionSettings template, ImmutableList.Builder<InboundSocket> out)
 +    {
 +        InboundConnectionSettings settings = template.withDefaults();
 +        out.add(new InboundSocket(settings));
 +        if (settings.encryption.enable_legacy_ssl_storage_port && settings.encryption.enabled)
 +            out.add(new InboundSocket(template.withLegacyDefaults()));
 +    }
 +
 +    public Future<Void> open(Consumer<ChannelPipeline> pipelineInjector)
 +    {
 +        List<Future<Void>> opening = new ArrayList<>();
 +        for (InboundSocket socket : sockets)
 +            opening.add(socket.open(pipelineInjector));
 +
 +        return new FutureCombiner(opening);
 +    }
 +
 +    public Future<Void> open()
 +    {
 +        List<Future<Void>> opening = new ArrayList<>();
 +        for (InboundSocket socket : sockets)
 +            opening.add(socket.open());
 +        return new FutureCombiner(opening);
 +    }
 +
 +    public boolean isListening()
 +    {
 +        for (InboundSocket socket : sockets)
 +            if (socket.isOpen())
 +                return true;
 +        return false;
 +    }
 +
-     public Future<Void> close()
++    public Future<Void> close(Consumer<? super ExecutorService> shutdownExecutors)
 +    {
 +        List<Future<Void>> closing = new ArrayList<>();
 +        for (InboundSocket address : sockets)
-             closing.add(address.close());
++            closing.add(address.close(shutdownExecutors));
 +        return new FutureCombiner(closing);
 +    }
++    public Future<Void> close()
++    {
++        return close(e -> {});
++    }
 +
 +    private static boolean shouldListenOnBroadcastAddress()
 +    {
 +        return DatabaseDescriptor.shouldListenOnBroadcastAddress()
 +               && !FBUtilities.getLocalAddressAndPort().equals(FBUtilities.getBroadcastAddressAndPort());
 +    }
 +
 +    @VisibleForTesting
 +    public List<InboundSocket> sockets()
 +    {
 +        return sockets;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index f5aadc6,dfca087..8b5ab1a
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -17,15 -17,27 +17,18 @@@
   */
  package org.apache.cassandra.net;
  
 -import java.io.*;
 -import java.net.*;
 -import java.nio.channels.AsynchronousCloseException;
  import java.nio.channels.ClosedChannelException;
 -import java.nio.channels.ServerSocketChannel;
 -import java.util.*;
 -import java.util.concurrent.ConcurrentMap;
 -import java.util.concurrent.CopyOnWriteArraySet;
 +import java.util.ArrayList;
++import java.util.Collection;
++import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
++import java.util.concurrent.ExecutorService;
  import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.atomic.AtomicInteger;
 -import java.util.stream.Collectors;
 -import java.util.stream.StreamSupport;
 -
 -import javax.net.ssl.SSLHandshakeException;
 +import java.util.concurrent.TimeoutException;
  
  import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.base.Function;
 -import com.google.common.collect.Lists;
 -import com.google.common.collect.Sets;
 -
 -import org.cliffc.high_scale_lib.NonBlockingHashMap;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -34,222 -46,835 +37,224 @@@ import io.netty.util.concurrent.Future
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
 -import org.apache.cassandra.concurrent.LocalAwareExecutorService;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 -import org.apache.cassandra.db.*;
 -import org.apache.cassandra.batchlog.Batch;
 -import org.apache.cassandra.dht.AbstractBounds;
 -import org.apache.cassandra.dht.BootStrapper;
 -import org.apache.cassandra.dht.IPartitioner;
 -import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.db.SystemKeyspace;
  import org.apache.cassandra.exceptions.RequestFailureReason;
 -import org.apache.cassandra.gms.EchoMessage;
 -import org.apache.cassandra.gms.GossipDigestAck;
 -import org.apache.cassandra.gms.GossipDigestAck2;
 -import org.apache.cassandra.gms.GossipDigestSyn;
 -import org.apache.cassandra.hints.HintMessage;
 -import org.apache.cassandra.hints.HintResponse;
 -import org.apache.cassandra.io.IVersionedSerializer;
 -import org.apache.cassandra.io.util.DataInputPlus;
 -import org.apache.cassandra.io.util.DataOutputPlus;
 -import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.locator.ILatencySubscriber;
 -import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 -import org.apache.cassandra.metrics.ConnectionMetrics;
 -import org.apache.cassandra.metrics.DroppedMessageMetrics;
 -import org.apache.cassandra.metrics.MessagingMetrics;
 -import org.apache.cassandra.repair.messages.RepairMessage;
 -import org.apache.cassandra.security.SSLFactory;
 -import org.apache.cassandra.service.*;
 -import org.apache.cassandra.service.paxos.Commit;
 -import org.apache.cassandra.service.paxos.PrepareResponse;
 -import org.apache.cassandra.tracing.TraceState;
 -import org.apache.cassandra.tracing.Tracing;
 -import org.apache.cassandra.utils.*;
 -import org.apache.cassandra.utils.concurrent.SimpleCondition;
 -
 -public final class MessagingService implements MessagingServiceMBean
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.service.AbstractWriteResponseHandler;
++import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.utils.FBUtilities;
 +
++import static java.util.Collections.synchronizedList;
 +import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
 +import static org.apache.cassandra.concurrent.Stage.MUTATION;
 +import static org.apache.cassandra.utils.Throwables.maybeFail;
 +
 +/**
 + * MessagingService implements all internode communication - with the exception of SSTable streaming (for now).
 + *
 + * Specifically, it's responsible for dispatch of outbound messages to other nodes and routing of inbound messages
 + * to their appropriate {@link IVerbHandler}.
 + *
 + * <h2>Using MessagingService: sending requests and responses</h2>
 + *
 + * The are two ways to send a {@link Message}, and you should pick one depending on the desired behaviour:
 + *  1. To send a request that expects a response back, use
 + *     {@link #sendWithCallback(Message, InetAddressAndPort, RequestCallback)} method. Once a response
 + *     message is received, {@link RequestCallback#onResponse(Message)} method will be invoked on the
 + *     provided callback - in case of a success response. In case of a failure response (see {@link Verb#FAILURE_RSP}),
 + *     or if a response doesn't arrive within verb's configured expiry time,
 + *     {@link RequestCallback#onFailure(InetAddressAndPort, RequestFailureReason)} will be invoked instead.
 + *  2. To send a response back, or a message that expects no response, use {@link #send(Message, InetAddressAndPort)}
 + *     method.
 + *
 + * See also: {@link Message#out(Verb, Object)}, {@link Message#responseWith(Object)},
 + * and {@link Message#failureResponse(RequestFailureReason)}.
 + *
 + * <h2>Using MessagingService: handling a request</h2>
 + *
 + * As described in the previous section, to handle responses you only need to implement {@link RequestCallback}
 + * interface - so long as your response verb handler is the default {@link ResponseVerbHandler}.
 + *
 + * There are two steps you need to perform to implement request handling:
 + *  1. Create a {@link IVerbHandler} to process incoming requests and responses for the new type (if applicable).
 + *  2. Add a new {@link Verb} to the enum for the new request type, and, if applicable, one for the response message.
 + *
 + * MessagingService will now automatically invoke your handler whenever a {@link Message} with this verb arrives.
 + *
 + * <h1>Architecture of MessagingService</h1>
 + *
 + * <h2>QOS</h2>
 + *
 + * Since our messaging protocol is TCP-based, and also doesn't yet support interleaving messages with each other,
 + * we need a way to prevent head-of-line blocking adversely affecting all messages - in particular, large messages
 + * being in the way of smaller ones. To achive that (somewhat), we maintain three messaging connections to and
 + * from each peer:
 + * - one for large messages - defined as being larger than {@link OutboundConnections#LARGE_MESSAGE_THRESHOLD}
 + *   (65KiB by default)
 + * - one for small messages - defined as smaller than that threshold
 + * - and finally, a connection for urgent messages - usually small and/or that are important to arrive
 + *   promptly, e.g. gossip-related ones
 + *
 + * <h2>Wire format and framing</h2>
 + *
 + * Small messages are grouped together into frames, and large messages are split over multiple frames.
 + * Framing provides application-level integrity protection to otherwise raw streams of data - we use
 + * CRC24 for frame headers and CRC32 for the entire payload. LZ4 is optionally used for compression.
 + *
 + * You can find the on-wire format description of individual messages in the comments for
 + * {@link Message.Serializer}, alongside with format evolution notes.
 + * For the list and descriptions of available frame decoders see {@link FrameDecoder} comments. You can
 + * find wire format documented in the javadoc of {@link FrameDecoder} implementations:
 + * see {@link FrameDecoderCrc} and {@link FrameDecoderLZ4} in particular.
 + *
 + * <h2>Architecture of outbound messaging</h2>
 + *
 + * {@link OutboundConnection} is the core class implementing outbound connection logic, with
 + * {@link OutboundConnection#enqueue(Message)} being its main entry point. The connections are initiated
 + * by {@link OutboundConnectionInitiator}.
 + *
 + * Netty pipeline for outbound messaging connections generally consists of the following handlers:
 + *
 + * [(optional) SslHandler] <- [FrameEncoder]
 + *
 + * {@link OutboundConnection} handles the entire lifetime of a connection: from the very first handshake
 + * to any necessary reconnects if necessary.
 + *
 + * Message-delivery flow varies depending on the connection type.
 + *
 + * For {@link ConnectionType#SMALL_MESSAGES} and {@link ConnectionType#URGENT_MESSAGES},
 + * {@link Message} serialization and delivery occurs directly on the event loop.
 + * See {@link OutboundConnection.EventLoopDelivery} for details.
 + *
 + * For {@link ConnectionType#LARGE_MESSAGES}, to ensure that servicing large messages doesn't block
 + * timely service of other requests, message serialization is offloaded to a companion thread pool
 + * ({@link SocketFactory#synchronousWorkExecutor}). Most of the work will be performed by
 + * {@link AsyncChannelOutputPlus}. Please see {@link OutboundConnection.LargeMessageDelivery}
 + * for details.
 + *
 + * To prevent fast clients, or slow nodes on the other end of the connection from overwhelming
 + * a host with enqueued, unsent messages on heap, we impose strict limits on how much memory enqueued,
 + * undelivered messages can claim.
 + *
 + * Every individual connection gets an exclusive permit quota to use - 4MiB by default; every endpoint
 + * (group of large, small, and urgent connection) is capped at, by default, at 128MiB of undelivered messages,
 + * and a global limit of 512MiB is imposed on all endpoints combined.
 + *
 + * On an attempt to {@link OutboundConnection#enqueue(Message)}, the connection will attempt to allocate
 + * permits for message-size number of bytes from its exclusive quota; if successful, it will add the
 + * message to the queue; if unsuccessful, it will need to allocate remainder from both endpoint and lobal
 + * reserves, and if it fails to do so, the message will be rejected, and its callbacks, if any,
 + * immediately expired.
 + *
 + * For a more detailed description please see the docs and comments of {@link OutboundConnection}.
 + *
 + * <h2>Architecture of inbound messaging</h2>
 + *
 + * {@link InboundMessageHandler} is the core class implementing inbound connection logic, paired
 + * with {@link FrameDecoder}. Inbound connections are initiated by {@link InboundConnectionInitiator}.
 + * The primary entry points to these classes are {@link FrameDecoder#channelRead(ShareableBytes)}
 + * and {@link InboundMessageHandler#process(FrameDecoder.Frame)}.
 + *
 + * Netty pipeline for inbound messaging connections generally consists of the following handlers:
 + *
 + * [(optional) SslHandler] -> [FrameDecoder] -> [InboundMessageHandler]
 + *
 + * {@link FrameDecoder} is responsible for decoding incoming frames and work stashing; {@link InboundMessageHandler}
 + * then takes decoded frames from the decoder and processes the messages contained in them.
 + *
 + * The flow differs between small and large messages. Small ones are deserialized immediately, and only
 + * then scheduled on the right thread pool for the {@link Verb} for execution. Large messages, OTOH,
 + * aren't deserialized until they are just about to be executed on the appropriate {@link Stage}.
 + *
 + * Similarly to outbound handling, inbound messaging imposes strict memory utilisation limits on individual
 + * endpoints and on global aggregate consumption, and implements simple flow control, to prevent a single
 + * fast endpoint from overwhelming a host.
 + *
 + * Every individual connection gets an exclusive permit quota to use - 4MiB by default; every endpoint
 + * (group of large, small, and urgent connection) is capped at, by default, at 128MiB of unprocessed messages,
 + * and a global limit of 512MiB is imposed on all endpoints combined.
 + *
 + * On arrival of a message header, the handler will attempt to allocate permits for message-size number
 + * of bytes from its exclusive quota; if successful, it will proceed to deserializing and processing the message.
 + * If unsuccessful, the handler will attempt to allocate the remainder from its endpoint and global reserve;
 + * if either allocation is unsuccessful, the handler will cease any further frame processing, and tell
 + * {@link FrameDecoder} to stop reading from the network; subsequently, it will put itself on a special
 + * {@link org.apache.cassandra.net.InboundMessageHandler.WaitQueue}, to be reactivated once more permits
 + * become available.
 + *
 + * For a more detailed description please see the docs and comments of {@link InboundMessageHandler} and
 + * {@link FrameDecoder}.
 + *
 + * <h2>Observability</h2>
 + *
 + * MessagingService exposes diagnostic counters for both outbound and inbound directions - received and sent
 + * bytes and message counts, overload bytes and message count, error bytes and error counts, and many more.
 + *
 + * See {@link org.apache.cassandra.metrics.InternodeInboundMetrics} and
 + * {@link org.apache.cassandra.metrics.InternodeOutboundMetrics} for JMX-exposed counters.
 + *
 + * We also provide {@code system_views.internode_inbound} and {@code system_views.internode_outbound} virtual tables -
 + * implemented in {@link org.apache.cassandra.db.virtual.InternodeInboundTable} and
 + * {@link org.apache.cassandra.db.virtual.InternodeOutboundTable} respectively.
 + */
 +public final class MessagingService extends MessagingServiceMBeanImpl
  {
 -    // Required to allow schema migrations while upgrading within the minor 3.0.x/3.x versions to 3.11+.
 -    // See CASSANDRA-13004 for details.
 -    public final static boolean FORCE_3_0_PROTOCOL_VERSION = Boolean.getBoolean("cassandra.force_3_0_protocol_version");
 -
 -    public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
 +    private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
  
      // 8 bits version, so don't waste versions
 -    public static final int VERSION_12 = 6;
 -    public static final int VERSION_20 = 7;
 -    public static final int VERSION_21 = 8;
 -    public static final int VERSION_22 = 9;
      public static final int VERSION_30 = 10;
      public static final int VERSION_3014 = 11;
 -    public static final int current_version = FORCE_3_0_PROTOCOL_VERSION ? VERSION_30 : VERSION_3014;
 -
 -    public static final String FAILURE_CALLBACK_PARAM = "CAL_BAC";
 -    public static final byte[] ONE_BYTE = new byte[1];
 -    public static final String FAILURE_RESPONSE_PARAM = "FAIL";
 -    public static final String FAILURE_REASON_PARAM = "FAIL_REASON";
 -
 -    /**
 -     * we preface every message with this number so the recipient can validate the sender is sane
 -     */
 -    public static final int PROTOCOL_MAGIC = 0xCA552DFA;
 -
 -    private boolean allNodesAtLeast22 = true;
 -    private boolean allNodesAtLeast30 = true;
 -
 -    public final MessagingMetrics metrics = new MessagingMetrics();
 -
 -    /* All verb handler identifiers */
 -    public enum Verb
 -    {
 -        MUTATION
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getWriteRpcTimeout();
 -            }
 -        },
 -        HINT
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getWriteRpcTimeout();
 -            }
 -        },
 -        READ_REPAIR
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getWriteRpcTimeout();
 -            }
 -        },
 -        READ
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getReadRpcTimeout();
 -            }
 -        },
 -        REQUEST_RESPONSE, // client-initiated reads and writes
 -        BATCH_STORE
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getWriteRpcTimeout();
 -            }
 -        },  // was @Deprecated STREAM_INITIATE,
 -        BATCH_REMOVE
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getWriteRpcTimeout();
 -            }
 -        }, // was @Deprecated STREAM_INITIATE_DONE,
 -        @Deprecated STREAM_REPLY,
 -        @Deprecated STREAM_REQUEST,
 -        RANGE_SLICE
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getRangeRpcTimeout();
 -            }
 -        },
 -        @Deprecated BOOTSTRAP_TOKEN,
 -        @Deprecated TREE_REQUEST,
 -        @Deprecated TREE_RESPONSE,
 -        @Deprecated JOIN,
 -        GOSSIP_DIGEST_SYN,
 -        GOSSIP_DIGEST_ACK,
 -        GOSSIP_DIGEST_ACK2,
 -        @Deprecated DEFINITIONS_ANNOUNCE,
 -        DEFINITIONS_UPDATE,
 -        TRUNCATE
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getTruncateRpcTimeout();
 -            }
 -        },
 -        SCHEMA_CHECK,
 -        @Deprecated INDEX_SCAN,
 -        REPLICATION_FINISHED,
 -        INTERNAL_RESPONSE, // responses to internal calls
 -        COUNTER_MUTATION
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getCounterWriteRpcTimeout();
 -            }
 -        },
 -        @Deprecated STREAMING_REPAIR_REQUEST,
 -        @Deprecated STREAMING_REPAIR_RESPONSE,
 -        SNAPSHOT, // Similar to nt snapshot
 -        MIGRATION_REQUEST,
 -        GOSSIP_SHUTDOWN,
 -        _TRACE, // dummy verb so we can use MS.droppedMessagesMap
 -        ECHO,
 -        REPAIR_MESSAGE,
 -        PAXOS_PREPARE
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getWriteRpcTimeout();
 -            }
 -        },
 -        PAXOS_PROPOSE
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getWriteRpcTimeout();
 -            }
 -        },
 -        PAXOS_COMMIT
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getWriteRpcTimeout();
 -            }
 -        },
 -        @Deprecated PAGED_RANGE
 -        {
 -            public long getTimeout()
 -            {
 -                return DatabaseDescriptor.getRangeRpcTimeout();
 -            }
 -        },
 -        PING,
 -        // UNUSED verbs were used as padding for backward/forward compatability before 4.0,
 -        // but it wasn't quite as bullet/future proof as needed. We still need to keep these entries
 -        // around, at least for a major rev or two (post-4.0). see CASSANDRA-13993 for a discussion.
 -        // For now, though, the UNUSED are legacy values (placeholders, basically) that should only be used
 -        // for correctly adding VERBs that need to be emergency additions to 3.0/3.11.
 -        // We can reclaim them (their id's, to be correct) in future versions, if desired, though.
 -        UNUSED_2,
 -        UNUSED_3,
 -        UNUSED_4,
 -        UNUSED_5,
 -        ;
 -        // remember to add new verbs at the end, since we serialize by ordinal
 -
 -        // This is to support a "late" choice of the verb based on the messaging service version.
 -        // See CASSANDRA-12249 for more details.
 -        public static Verb convertForMessagingServiceVersion(Verb verb, int version)
 -        {
 -            if (verb == PAGED_RANGE && version >= VERSION_30)
 -                return RANGE_SLICE;
 -
 -            return verb;
 -        }
 -
 -        public long getTimeout()
 -        {
 -            return DatabaseDescriptor.getRpcTimeout();
 -        }
 -    }
 -
 -    public static final Verb[] verbValues = Verb.values();
 -
 -    public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
 -    {{
 -        put(Verb.MUTATION, Stage.MUTATION);
 -        put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
 -        put(Verb.READ_REPAIR, Stage.MUTATION);
 -        put(Verb.HINT, Stage.MUTATION);
 -        put(Verb.TRUNCATE, Stage.MUTATION);
 -        put(Verb.PAXOS_PREPARE, Stage.MUTATION);
 -        put(Verb.PAXOS_PROPOSE, Stage.MUTATION);
 -        put(Verb.PAXOS_COMMIT, Stage.MUTATION);
 -        put(Verb.BATCH_STORE, Stage.MUTATION);
 -        put(Verb.BATCH_REMOVE, Stage.MUTATION);
 -
 -        put(Verb.READ, Stage.READ);
 -        put(Verb.RANGE_SLICE, Stage.READ);
 -        put(Verb.INDEX_SCAN, Stage.READ);
 -        put(Verb.PAGED_RANGE, Stage.READ);
 -
 -        put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
 -        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
 -
 -        put(Verb.STREAM_REPLY, Stage.MISC); // actually handled by FileStreamTask and streamExecutors
 -        put(Verb.STREAM_REQUEST, Stage.MISC);
 -        put(Verb.REPLICATION_FINISHED, Stage.MISC);
 -        put(Verb.SNAPSHOT, Stage.MISC);
 -
 -        put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY);
 -        put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY);
 -        put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY);
 -        put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY);
 -        put(Verb.REPAIR_MESSAGE, Stage.ANTI_ENTROPY);
 -        put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP);
 -        put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP);
 -        put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP);
 -        put(Verb.GOSSIP_SHUTDOWN, Stage.GOSSIP);
 -
 -        put(Verb.DEFINITIONS_UPDATE, Stage.MIGRATION);
 -        put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
 -        put(Verb.MIGRATION_REQUEST, Stage.MIGRATION);
 -        put(Verb.INDEX_SCAN, Stage.READ);
 -        put(Verb.REPLICATION_FINISHED, Stage.MISC);
 -        put(Verb.SNAPSHOT, Stage.MISC);
 -        put(Verb.ECHO, Stage.GOSSIP);
 -
 -        put(Verb.UNUSED_2, Stage.INTERNAL_RESPONSE);
 -        put(Verb.UNUSED_3, Stage.INTERNAL_RESPONSE);
 -
 -        put(Verb.PING, Stage.READ);
 -    }};
 -
 -    /**
 -     * Messages we receive in IncomingTcpConnection have a Verb that tells us what kind of message it is.
 -     * Most of the time, this is enough to determine how to deserialize the message payload.
 -     * The exception is the REQUEST_RESPONSE verb, which just means "a reply to something you told me to do."
 -     * Traditionally, this was fine since each VerbHandler knew what type of payload it expected, and
 -     * handled the deserialization itself.  Now that we do that in ITC, to avoid the extra copy to an
 -     * intermediary byte[] (See CASSANDRA-3716), we need to wire that up to the CallbackInfo object
 -     * (see below).
 -     */
 -    public final EnumMap<Verb, IVersionedSerializer<?>> verbSerializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
 -    {{
 -        put(Verb.REQUEST_RESPONSE, CallbackDeterminedSerializer.instance);
 -        put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
 -
 -        put(Verb.MUTATION, Mutation.serializer);
 -        put(Verb.READ_REPAIR, Mutation.serializer);
 -        put(Verb.READ, ReadCommand.readSerializer);
 -        put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
 -        put(Verb.PAGED_RANGE, ReadCommand.pagedRangeSerializer);
 -        put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
 -        put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
 -        put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);
 -        put(Verb.GOSSIP_DIGEST_ACK2, GossipDigestAck2.serializer);
 -        put(Verb.GOSSIP_DIGEST_SYN, GossipDigestSyn.serializer);
 -        put(Verb.DEFINITIONS_UPDATE, MigrationManager.MigrationsSerializer.instance);
 -        put(Verb.TRUNCATE, Truncation.serializer);
 -        put(Verb.REPLICATION_FINISHED, null);
 -        put(Verb.COUNTER_MUTATION, CounterMutation.serializer);
 -        put(Verb.SNAPSHOT, SnapshotCommand.serializer);
 -        put(Verb.ECHO, EchoMessage.serializer);
 -        put(Verb.PAXOS_PREPARE, Commit.serializer);
 -        put(Verb.PAXOS_PROPOSE, Commit.serializer);
 -        put(Verb.PAXOS_COMMIT, Commit.serializer);
 -        put(Verb.HINT, HintMessage.serializer);
 -        put(Verb.BATCH_STORE, Batch.serializer);
 -        put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
 -        put(Verb.PING, PingMessage.serializer);
 -    }};
 -
 -    /**
 -     * A Map of what kind of serializer to wire up to a REQUEST_RESPONSE callback, based on outbound Verb.
 -     */
 -    public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
 -    {{
 -        put(Verb.MUTATION, WriteResponse.serializer);
 -        put(Verb.HINT, HintResponse.serializer);
 -        put(Verb.READ_REPAIR, WriteResponse.serializer);
 -        put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
 -        put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
 -        put(Verb.PAGED_RANGE, ReadResponse.rangeSliceSerializer);
 -        put(Verb.READ, ReadResponse.serializer);
 -        put(Verb.TRUNCATE, TruncateResponse.serializer);
 -        put(Verb.SNAPSHOT, null);
 -
 -        put(Verb.MIGRATION_REQUEST, MigrationManager.MigrationsSerializer.instance);
 -        put(Verb.SCHEMA_CHECK, UUIDSerializer.serializer);
 -        put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
 -        put(Verb.REPLICATION_FINISHED, null);
 -
 -        put(Verb.PAXOS_PREPARE, PrepareResponse.serializer);
 -        put(Verb.PAXOS_PROPOSE, BooleanSerializer.serializer);
 -
 -        put(Verb.BATCH_STORE, WriteResponse.serializer);
 -        put(Verb.BATCH_REMOVE, WriteResponse.serializer);
 -    }};
 -
 -    /* This records all the results mapped by message Id */
 -    private final ExpiringMap<Integer, CallbackInfo> callbacks;
 -
 -    /**
 -     * a placeholder class that means "deserialize using the callback." We can't implement this without
 -     * special-case code in InboundTcpConnection because there is no way to pass the message id to IVersionedSerializer.
 -     */
 -    static class CallbackDeterminedSerializer implements IVersionedSerializer<Object>
 -    {
 -        public static final CallbackDeterminedSerializer instance = new CallbackDeterminedSerializer();
 -
 -        public Object deserialize(DataInputPlus in, int version) throws IOException
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public void serialize(Object o, DataOutputPlus out, int version) throws IOException
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -
 -        public long serializedSize(Object o, int version)
 -        {
 -            throw new UnsupportedOperationException();
 -        }
 -    }
 -
 -    /* Lookup table for registering message handlers based on the verb. */
 -    private final Map<Verb, IVerbHandler> verbHandlers;
 -
 -    private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>();
 -
 -    private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
 -    private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
 -
 -    private final List<SocketThread> socketThreads = Lists.newArrayList();
 -    private final SimpleCondition listenGate;
 -
 -    /**
 -     * Verbs it's okay to drop if the request has been queued longer than the request timeout.  These
 -     * all correspond to client requests or something triggered by them; we don't want to
 -     * drop internal messages like bootstrap or repair notifications.
 -     */
 -    public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE,
 -                                                                   Verb.MUTATION,
 -                                                                   Verb.COUNTER_MUTATION,
 -                                                                   Verb.HINT,
 -                                                                   Verb.READ_REPAIR,
 -                                                                   Verb.READ,
 -                                                                   Verb.RANGE_SLICE,
 -                                                                   Verb.PAGED_RANGE,
 -                                                                   Verb.REQUEST_RESPONSE,
 -                                                                   Verb.BATCH_STORE,
 -                                                                   Verb.BATCH_REMOVE);
 -
 -    private static final class DroppedMessages
 -    {
 -        final DroppedMessageMetrics metrics;
 -        final AtomicInteger droppedInternal;
 -        final AtomicInteger droppedCrossNode;
 -
 -        DroppedMessages(Verb verb)
 -        {
 -            this(new DroppedMessageMetrics(verb));
 -        }
 -
 -        DroppedMessages(DroppedMessageMetrics metrics)
 -        {
 -            this.metrics = metrics;
 -            this.droppedInternal = new AtomicInteger(0);
 -            this.droppedCrossNode = new AtomicInteger(0);
 -        }
 -    }
 -
 -    @VisibleForTesting
 -    public void resetDroppedMessagesMap(String scope)
 -    {
 -        for (Verb verb : droppedMessagesMap.keySet())
 -            droppedMessagesMap.put(verb, new DroppedMessages(new DroppedMessageMetrics(metricName -> {
 -                return new CassandraMetricsRegistry.MetricName("DroppedMessages", metricName, scope);
 -            })));
 -    }
 -
 -    // total dropped message counts for server lifetime
 -    private final Map<Verb, DroppedMessages> droppedMessagesMap = new EnumMap<>(Verb.class);
 -
 -    private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
 -
 -    // protocol versions of the other nodes in the cluster
 -    private final ConcurrentMap<InetAddress, Integer> versions = new NonBlockingHashMap<InetAddress, Integer>();
 -
 -    // message sinks are a testing hook
 -    private final Set<IMessageSink> messageSinks = new CopyOnWriteArraySet<>();
 -
 -    // back-pressure implementation
 -    private final BackPressureStrategy backPressure = DatabaseDescriptor.getBackPressureStrategy();
 -
 -    private static class MSHandle
 -    {
 -        public static final MessagingService instance = new MessagingService(false);
 -    }
 -
 -    public static MessagingService instance()
 -    {
 -        return MSHandle.instance;
 -    }
 -
 -    private static class MSTestHandle
 -    {
 -        public static final MessagingService instance = new MessagingService(true);
 -    }
 -
 -    static MessagingService test()
 -    {
 -        return MSTestHandle.instance;
 -    }
 -
 -    private MessagingService(boolean testOnly)
 -    {
 -        for (Verb verb : DROPPABLE_VERBS)
 -            droppedMessagesMap.put(verb, new DroppedMessages(verb));
 -
 -        listenGate = new SimpleCondition();
 -        verbHandlers = new EnumMap<>(Verb.class);
 -        if (!testOnly)
 -        {
 -            Runnable logDropped = new Runnable()
 -            {
 -                public void run()
 -                {
 -                    logDroppedMessages();
 -                }
 -            };
 -            ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 -        }
 -
 -        Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, ?> timeoutReporter = new Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>, Object>()
 -        {
 -            public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pair)
 -            {
 -                final CallbackInfo expiredCallbackInfo = pair.right.value;
 -
 -                maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);
 -
 -                ConnectionMetrics.totalTimeouts.mark();
 -                getConnectionPool(expiredCallbackInfo.target).incrementTimeout();
 -
 -                if (expiredCallbackInfo.callback.supportsBackPressure())
 -                {
 -                    updateBackPressureOnReceive(expiredCallbackInfo.target, expiredCallbackInfo.callback, true);
 -                }
 -
 -                if (expiredCallbackInfo.isFailureCallback())
 -                {
 -                    StageManager.getStage(Stage.INTERNAL_RESPONSE).submit(new Runnable()
 -                    {
 -                        @Override
 -                        public void run()
 -                        {
 -                            ((IAsyncCallbackWithFailure)expiredCallbackInfo.callback).onFailure(expiredCallbackInfo.target, RequestFailureReason.UNKNOWN);
 -                        }
 -                    });
 -                }
 -
 -                if (expiredCallbackInfo.shouldHint())
 -                {
 -                    Mutation mutation = ((WriteCallbackInfo) expiredCallbackInfo).mutation();
 -                    return StorageProxy.submitHint(mutation, expiredCallbackInfo.target, null);
 -                }
 -
 -                return null;
 -            }
 -        };
 -
 -        callbacks = new ExpiringMap<>(DatabaseDescriptor.getMinRpcTimeout(), timeoutReporter);
 -
 -        if (!testOnly)
 -        {
 -            MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 -        }
 -    }
 -
 -    public void addMessageSink(IMessageSink sink)
 -    {
 -        messageSinks.add(sink);
 -    }
 -
 -    public void removeMessageSink(IMessageSink sink)
 -    {
 -        messageSinks.remove(sink);
 -    }
 -
 -    public void clearMessageSinks()
 -    {
 -        messageSinks.clear();
 -    }
 -
 -    /**
 -     * Updates the back-pressure state on sending to the given host if enabled and the given message callback supports it.
 -     *
 -     * @param host The replica host the back-pressure state refers to.
 -     * @param callback The message callback.
 -     * @param message The actual message.
 -     */
 -    public void updateBackPressureOnSend(InetAddress host, IAsyncCallback callback, MessageOut<?> message)
 -    {
 -        if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
 -        {
 -            BackPressureState backPressureState = getConnectionPool(host).getBackPressureState();
 -            backPressureState.onMessageSent(message);
 -        }
 -    }
 -
 -    /**
 -     * Updates the back-pressure state on reception from the given host if enabled and the given message callback supports it.
 -     *
 -     * @param host The replica host the back-pressure state refers to.
 -     * @param callback The message callback.
 -     * @param timeout True if updated following a timeout, false otherwise.
 -     */
 -    public void updateBackPressureOnReceive(InetAddress host, IAsyncCallback callback, boolean timeout)
 -    {
 -        if (DatabaseDescriptor.backPressureEnabled() && callback.supportsBackPressure())
 -        {
 -            BackPressureState backPressureState = getConnectionPool(host).getBackPressureState();
 -            if (!timeout)
 -                backPressureState.onResponseReceived();
 -            else
 -                backPressureState.onResponseTimeout();
 -        }
 -    }
 -
 -    /**
 -     * Applies back-pressure for the given hosts, according to the configured strategy.
 -     *
 -     * If the local host is present, it is removed from the pool, as back-pressure is only applied
 -     * to remote hosts.
 -     *
 -     * @param hosts The hosts to apply back-pressure to.
 -     * @param timeoutInNanos The max back-pressure timeout.
 -     */
 -    public void applyBackPressure(Iterable<InetAddress> hosts, long timeoutInNanos)
 -    {
 -        if (DatabaseDescriptor.backPressureEnabled())
 -        {
 -            backPressure.apply(StreamSupport.stream(hosts.spliterator(), false)
 -                    .filter(h -> !h.equals(FBUtilities.getBroadcastAddress()))
 -                    .map(h -> getConnectionPool(h).getBackPressureState())
 -                    .collect(Collectors.toSet()), timeoutInNanos, TimeUnit.NANOSECONDS);
 -        }
 -    }
 -
 -    /**
 -     * Track latency information for the dynamic snitch
 -     *
 -     * @param cb      the callback associated with this message -- this lets us know if it's a message type we're interested in
 -     * @param address the host that replied to the message
 -     * @param latency
 -     */
 -    public void maybeAddLatency(IAsyncCallback cb, InetAddress address, long latency)
 -    {
 -        if (cb.isLatencyForSnitch())
 -            addLatency(address, latency);
 -    }
 -
 -    public void addLatency(InetAddress address, long latency)
 -    {
 -        for (ILatencySubscriber subscriber : subscribers)
 -            subscriber.receiveTiming(address, latency);
 -    }
 -
 -    /**
 -     * called from gossiper when it notices a node is not responding.
 -     */
 -    public void convict(InetAddress ep)
 -    {
 -        logger.trace("Resetting pool for {}", ep);
 -        getConnectionPool(ep).reset();
 -    }
 -
 -    public void listen()
 -    {
 -        callbacks.reset(); // hack to allow tests to stop/restart MS
 -        listen(FBUtilities.getLocalAddress());
 -        if (DatabaseDescriptor.shouldListenOnBroadcastAddress()
 -            && !FBUtilities.getLocalAddress().equals(FBUtilities.getBroadcastAddress()))
 -        {
 -            listen(FBUtilities.getBroadcastAddress());
 -        }
 -        listenGate.signalAll();
 -    }
 -
 -    /**
 -     * Listen on the specified port.
 -     *
 -     * @param localEp InetAddress whose port to listen on.
 -     */
 -    private void listen(InetAddress localEp) throws ConfigurationException
 -    {
 -        for (ServerSocket ss : getServerSockets(localEp))
 -        {
 -            SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
 -            th.start();
 -            socketThreads.add(th);
 -        }
 -    }
 -
 -    @SuppressWarnings("resource")
 -    private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException
 -    {
 -        final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
 -        if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none)
 -        {
 -            try
 -            {
 -                ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getServerEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort()));
 -            }
 -            catch (IOException e)
 -            {
 -                throw new ConfigurationException("Unable to create ssl socket", e);
 -            }
 -            // setReuseAddress happens in the factory.
 -            logger.info("Starting Encrypted Messaging Service on SSL port {}", DatabaseDescriptor.getSSLStoragePort());
 -        }
 -
 -        if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.all)
 -        {
 -            ServerSocketChannel serverChannel = null;
 -            try
 -            {
 -                serverChannel = ServerSocketChannel.open();
 -            }
 -            catch (IOException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 -            ServerSocket socket = serverChannel.socket();
 -            try
 -            {
 -                socket.setReuseAddress(true);
 -            }
 -            catch (SocketException e)
 -            {
 -                FileUtils.closeQuietly(socket);
 -                throw new ConfigurationException("Insufficient permissions to setReuseAddress", e);
 -            }
 -            InetSocketAddress address = new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort());
 -            try
 -            {
 -                socket.bind(address,500);
 -            }
 -            catch (BindException e)
 -            {
 -                FileUtils.closeQuietly(socket);
 -                if (e.getMessage().contains("in use"))
 -                    throw new ConfigurationException(address + " is in use by another process.  Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
 -                else if (e.getMessage().contains("Cannot assign requested address"))
 -                    throw new ConfigurationException("Unable to bind to address " + address
 -                                                     + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
 -                else
 -                    throw new RuntimeException(e);
 -            }
 -            catch (IOException e)
 -            {
 -                FileUtils.closeQuietly(socket);
 -                throw new RuntimeException(e);
 -            }
 -            String nic = FBUtilities.getNetworkInterface(localEp);
 -            logger.info("Starting Messaging Service on {}:{}{}", localEp, DatabaseDescriptor.getStoragePort(),
 -                        nic == null? "" : String.format(" (%s)", nic));
 -            ss.add(socket);
 -        }
 -        return ss;
 -    }
 +    public static final int VERSION_40 = 12;
 +    public static final int minimum_version = VERSION_30;
 +    public static final int current_version = VERSION_40;
 +    static AcceptVersions accept_messaging = new AcceptVersions(minimum_version, current_version);
 +    static AcceptVersions accept_streaming = new AcceptVersions(current_version, current_version);
  
 -    public void waitUntilListening()
 -    {
 -        try
 -        {
 -            listenGate.await();
 -        }
 -        catch (InterruptedException ie)
 -        {
 -            logger.trace("await interrupted");
 -        }
 -    }
 -
 -    public boolean isListening()
 -    {
 -        return listenGate.isSignaled();
 -    }
 -
 -    public void destroyConnectionPool(InetAddress to)
 -    {
 -        OutboundTcpConnectionPool cp = connectionManagers.get(to);
 -        if (cp == null)
 -            return;
 -        cp.close();
 -        connectionManagers.remove(to);
 -    }
 -
 -    public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
 -    {
 -        OutboundTcpConnectionPool cp = connectionManagers.get(to);
 -        if (cp == null)
 -        {
 -            cp = new OutboundTcpConnectionPool(to, backPressure.newState(to));
 -            OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp);
 -            if (existingPool != null)
 -                cp = existingPool;
 -            else
 -                cp.start();
 -        }
 -        cp.waitForStarted();
 -        return cp;
 -    }
 -
 -
 -    public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg)
 -    {
 -        return getConnectionPool(to).getConnection(msg);
 -    }
 -
 -    /**
 -     * Register a verb and the corresponding verb handler with the
 -     * Messaging Service.
 -     *
 -     * @param verb
 -     * @param verbHandler handler for the specified verb
 -     */
 -    public void registerVerbHandlers(Verb verb, IVerbHandler verbHandler)
 -    {
 -        assert !verbHandlers.containsKey(verb);
 -        verbHandlers.put(verb, verbHandler);
 -    }
 -
 -    /**
 -     * This method returns the verb handler associated with the registered
 -     * verb. If no handler has been registered then null is returned.
 -     *
 -     * @param type for which the verb handler is sought
 -     * @return a reference to IVerbHandler which is the handler for the specified verb
 -     */
 -    public IVerbHandler getVerbHandler(Verb type)
 +    private static class MSHandle
      {
 -        return verbHandlers.get(type);
 +        public static final MessagingService instance = new MessagingService(false);
      }
  
 -    public int addCallback(IAsyncCallback cb, MessageOut message, InetAddress to, long timeout, boolean failureCallback)
 +    public static MessagingService instance()
      {
 -        assert message.verb != Verb.MUTATION; // mutations need to call the overload with a ConsistencyLevel
 -        int messageId = nextId();
 -        CallbackInfo previous = callbacks.put(messageId, new CallbackInfo(to, cb, callbackDeserializers.get(message.verb), failureCallback), timeout);
 -        assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
 -        return messageId;
 +        return MSHandle.instance;
      }
  
 -    public int addCallback(IAsyncCallback cb,
 -                           MessageOut<?> message,
 -                           InetAddress to,
 -                           long timeout,
 -                           ConsistencyLevel consistencyLevel,
 -                           boolean allowHints)
 -    {
 -        assert message.verb == Verb.MUTATION
 -            || message.verb == Verb.COUNTER_MUTATION
 -            || message.verb == Verb.PAXOS_COMMIT;
 -        int messageId = nextId();
 +    public final SocketFactory socketFactory = new SocketFactory();
 +    public final LatencySubscribers latencySubscribers = new LatencySubscribers();
 +    public final RequestCallbacks callbacks = new RequestCallbacks(this);
  
 -        CallbackInfo previous = callbacks.put(messageId,
 -                                              new WriteCallbackInfo(to,
 -                                                                    cb,
 -                                                                    message,
 -                                                                    callbackDeserializers.get(message.verb),
 -                                                                    consistencyLevel,
 -                                                                    allowHints),
 -                                                                    timeout);
 -        assert previous == null : String.format("Callback already exists for id %d! (%s)", messageId, previous);
 -        return messageId;
 -    }
 +    // a public hook for filtering messages intended for delivery to this node
 +    public final InboundSink inboundSink = new InboundSink(this);
  
 -    private static final AtomicInteger idGen = new AtomicInteger(0);
 +    // the inbound global reserve limits and associated wait queue
 +    private final InboundMessageHandlers.GlobalResourceLimits inboundGlobalReserveLimits = new InboundMessageHandlers.GlobalResourceLimits(
 +        new ResourceLimits.Concurrent(DatabaseDescriptor.getInternodeApplicationReceiveQueueReserveGlobalCapacityInBytes()));
  
 -    private static int nextId()
 -    {
 -        return idGen.incrementAndGet();
 -    }
 +    // the socket bindings we accept incoming connections on
 +    private final InboundSockets inboundSockets = new InboundSockets(new InboundConnectionSettings()
 +                                                                     .withHandlers(this::getInbound)
 +                                                                     .withSocketFactory(socketFactory));
  
 -    public int sendRR(MessageOut message, InetAddress to, IAsyncCallback cb)
 -    {
 -        return sendRR(message, to, cb, message.getTimeout(), false);
 -    }
 +    // a public hook for filtering messages intended for delivery to another node
 +    public final OutboundSink outboundSink = new OutboundSink(this::doSend);
 +
 +    final ResourceLimits.Limit outboundGlobalReserveLimit =
 +        new ResourceLimits.Concurrent(DatabaseDescriptor.getInternodeApplicationSendQueueReserveGlobalCapacityInBytes());
 +
 +    // back-pressure implementation
 +    private final BackPressureStrategy backPressure = DatabaseDescriptor.getBackPressureStrategy();
  
 -    public int sendRRWithFailure(MessageOut message, InetAddress to, IAsyncCallbackWithFailure cb)
 +    private volatile boolean isShuttingDown;
 +
 +    @VisibleForTesting
 +    MessagingService(boolean testOnly)
      {
 -        return sendRR(message, to, cb, message.getTimeout(), true);
 +        super(testOnly);
 +        OutboundConnections.scheduleUnusedConnectionMonitoring(this, ScheduledExecutors.scheduledTasks, 1L, TimeUnit.HOURS);
      }
  
      /**
@@@ -379,206 -1372,173 +384,212 @@@
          }
      }
  
 -    private static void handleIOExceptionOnClose(IOException e) throws IOException
 +    /**
 +     * Applies back-pressure for the given hosts, according to the configured strategy.
 +     *
 +     * If the local host is present, it is removed from the pool, as back-pressure is only applied
 +     * to remote hosts.
 +     *
 +     * @param hosts The hosts to apply back-pressure to.
 +     * @param timeoutInNanos The max back-pressure timeout.
 +     */
 +    public void applyBackPressure(Iterable<InetAddressAndPort> hosts, long timeoutInNanos)
      {
 -        // dirty hack for clean shutdown on OSX w/ Java >= 1.8.0_20
 -        // see https://bugs.openjdk.java.net/browse/JDK-8050499;
 -        // also CASSANDRA-12513
 -        if ("Mac OS X".equals(System.getProperty("os.name")))
 +        if (DatabaseDescriptor.backPressureEnabled())
          {
 -            switch (e.getMessage())
 +            Set<BackPressureState> states = new HashSet<>();
 +            for (InetAddressAndPort host : hosts)
              {
 -                case "Unknown error: 316":
 -                case "No such file or directory":
 -                case "Bad file descriptor":
 -                case "Thread signal failed":
 -                    return;
 +                if (host.equals(FBUtilities.getBroadcastAddressAndPort()))
 +                    continue;
 +                states.add(getOutbound(host).getBackPressureState());
              }
 +            //noinspection unchecked
 +            backPressure.apply(states, timeoutInNanos, NANOSECONDS);
          }
 -
 -        throw e;
 -    }
 -
 -    public Map<String, Integer> getLargeMessagePendingTasks()
 -    {
 -        Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getPendingMessages());
 -        return pendingTasks;
 -    }
 -
 -    public int getLargeMessagePendingTasks(InetAddress address)
 -    {
 -        OutboundTcpConnectionPool connection = connectionManagers.get(address);
 -        return connection == null ? 0 : connection.largeMessages.getPendingMessages();
      }
  
 -    public Map<String, Long> getLargeMessageCompletedTasks()
 +    BackPressureState getBackPressureState(InetAddressAndPort host)
      {
 -        Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getCompletedMesssages());
 -        return completedTasks;
 +        return getOutbound(host).getBackPressureState();
      }
  
 -    public Map<String, Long> getLargeMessageDroppedTasks()
 +    void markExpiredCallback(InetAddressAndPort addr)
      {
 -        Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().largeMessages.getDroppedMessages());
 -        return droppedTasks;
 +        OutboundConnections conn = channelManagers.get(addr);
 +        if (conn != null)
 +            conn.incrementExpiredCallbackCount();
      }
  
 -    public Map<String, Integer> getSmallMessagePendingTasks()
 +    /**
 +     * Only to be invoked once we believe the endpoint will never be contacted again.
 +     *
 +     * We close the connection after a five minute delay, to give asynchronous operations a chance to terminate
 +     */
 +    public void closeOutbound(InetAddressAndPort to)
      {
 -        Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getPendingMessages());
 -        return pendingTasks;
 +        OutboundConnections pool = channelManagers.get(to);
 +        if (pool != null)
 +            pool.scheduleClose(5L, MINUTES, true)
 +                .addListener(future -> channelManagers.remove(to, pool));
      }
  
 -    public Map<String, Long> getSmallMessageCompletedTasks()
 +    /**
 +     * Only to be invoked once we believe the connections will never be used again.
 +     */
 +    void closeOutboundNow(OutboundConnections connections)
      {
 -        Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getCompletedMesssages());
 -        return completedTasks;
 +        connections.close(true).addListener(
 +            future -> channelManagers.remove(connections.template().to, connections)
 +        );
      }
  
 -    public Map<String, Long> getSmallMessageDroppedTasks()
 +    /**
 +     * Only to be invoked once we believe the connections will never be used again.
 +     */
 +    public void removeInbound(InetAddressAndPort from)
      {
 -        Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().smallMessages.getDroppedMessages());
 -        return droppedTasks;
 +        InboundMessageHandlers handlers = messageHandlers.remove(from);
 +        if (null != handlers)
 +            handlers.releaseMetrics();
      }
  
 -    public Map<String, Integer> getGossipMessagePendingTasks()
 +    /**
 +     * Closes any current open channel/connection to the endpoint, but does not cause any message loss, and we will
 +     * try to re-establish connections immediately
 +     */
 +    public void interruptOutbound(InetAddressAndPort to)
      {
 -        Map<String, Integer> pendingTasks = new HashMap<String, Integer>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            pendingTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getPendingMessages());
 -        return pendingTasks;
 +        OutboundConnections pool = channelManagers.get(to);
 +        if (pool != null)
 +            pool.interrupt();
      }
  
 -    public Map<String, Long> getGossipMessageCompletedTasks()
 +    /**
 +     * Reconnect to the peer using the given {@code addr}. Outstanding messages in each channel will be sent on the
 +     * current channel. Typically this function is used for something like EC2 public IP addresses which need to be used
 +     * for communication between EC2 regions.
 +     *
 +     * @param address IP Address to identify the peer
 +     * @param preferredAddress IP Address to use (and prefer) going forward for connecting to the peer
 +     */
 +    @SuppressWarnings("UnusedReturnValue")
 +    public Future<Void> maybeReconnectWithNewIp(InetAddressAndPort address, InetAddressAndPort preferredAddress)
      {
 -        Map<String, Long> completedTasks = new HashMap<String, Long>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            completedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getCompletedMesssages());
 -        return completedTasks;
 -    }
 +        if (!SystemKeyspace.updatePreferredIP(address, preferredAddress))
 +            return null;
  
 -    public Map<String, Long> getGossipMessageDroppedTasks()
 -    {
 -        Map<String, Long> droppedTasks = new HashMap<String, Long>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            droppedTasks.put(entry.getKey().getHostAddress(), entry.getValue().gossipMessages.getDroppedMessages());
 -        return droppedTasks;
 -    }
 +        OutboundConnections messagingPool = channelManagers.get(address);
 +        if (messagingPool != null)
 +            return messagingPool.reconnectWithNewIp(preferredAddress);
  
 -    public Map<String, Integer> getDroppedMessages()
 -    {
 -        Map<String, Integer> map = new HashMap<>(droppedMessagesMap.size());
 -        for (Map.Entry<Verb, DroppedMessages> entry : droppedMessagesMap.entrySet())
 -            map.put(entry.getKey().toString(), (int) entry.getValue().metrics.dropped.getCount());
 -        return map;
 +        return null;
      }
  
 -
 -    public long getTotalTimeouts()
 +    /**
 +     * Wait for callbacks and don't allow any more to be created (since they could require writing hints)
 +     */
 +    public void shutdown()
      {
 -        return ConnectionMetrics.totalTimeouts.getCount();
 +        shutdown(1L, MINUTES, true, true);
      }
  
 -    public Map<String, Long> getTimeoutsPerHost()
 +    public void shutdown(long timeout, TimeUnit units, boolean shutdownGracefully, boolean shutdownExecutors)
      {
 -        Map<String, Long> result = new HashMap<String, Long>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry: connectionManagers.entrySet())
 +        isShuttingDown = true;
 +        logger.info("Waiting for messaging service to quiesce");
 +        // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
 +        assert !StageManager.getStage(MUTATION).isShutdown();
 +
 +        if (shutdownGracefully)
 +        {
 +            callbacks.shutdownGracefully();
 +            List<Future<Void>> closing = new ArrayList<>();
 +            for (OutboundConnections pool : channelManagers.values())
 +                closing.add(pool.close(true));
 +
 +            long deadline = System.nanoTime() + units.toNanos(timeout);
 +            maybeFail(() -> new FutureCombiner(closing).get(timeout, units),
-                       () -> inboundSockets.close().get(),
++                      () -> {
++                          List<ExecutorService> inboundExecutors = new ArrayList<>();
++                          inboundSockets.close(synchronizedList(inboundExecutors)::add).get();
++                          ExecutorUtils.awaitTermination(1L, TimeUnit.MINUTES, inboundExecutors);
++                      },
 +                      () -> {
 +                          if (shutdownExecutors)
 +                              shutdownExecutors(deadline);
 +                      },
 +                      () -> callbacks.awaitTerminationUntil(deadline),
 +                      inboundSink::clear,
 +                      outboundSink::clear);
 +        }
 +        else
          {
 -            String ip = entry.getKey().getHostAddress();
 -            long recent = entry.getValue().getTimeouts();
 -            result.put(ip, recent);
 +            callbacks.shutdownNow(false);
 +            List<Future<Void>> closing = new ArrayList<>();
-             closing.add(inboundSockets.close());
++            List<ExecutorService> inboundExecutors = synchronizedList(new ArrayList<ExecutorService>());
++            closing.add(inboundSockets.close(inboundExecutors::add));
 +            for (OutboundConnections pool : channelManagers.values())
 +                closing.add(pool.close(false));
 +
 +            long deadline = System.nanoTime() + units.toNanos(timeout);
 +            maybeFail(() -> new FutureCombiner(closing).get(timeout, units),
 +                      () -> {
 +                          if (shutdownExecutors)
 +                              shutdownExecutors(deadline);
 +                      },
++                      () -> ExecutorUtils.awaitTermination(timeout, units, inboundExecutors),
 +                      () -> callbacks.awaitTerminationUntil(deadline),
 +                      inboundSink::clear,
 +                      outboundSink::clear);
          }
 -        return result;
      }
  
 -    public Map<String, Double> getBackPressurePerHost()
 +    private void shutdownExecutors(long deadlineNanos) throws TimeoutException, InterruptedException
      {
 -        Map<String, Double> map = new HashMap<>(connectionManagers.size());
 -        for (Map.Entry<InetAddress, OutboundTcpConnectionPool> entry : connectionManagers.entrySet())
 -            map.put(entry.getKey().getHostAddress(), entry.getValue().getBackPressureState().getBackPressureRateLimit());
 -
 -        return map;
 +        socketFactory.shutdownNow();
 +        socketFactory.awaitTerminationUntil(deadlineNanos);
      }
  
 -    @Override
 -    public void setBackPressureEnabled(boolean enabled)
 +    private OutboundConnections getOutbound(InetAddressAndPort to)
      {
 -        DatabaseDescriptor.setBackPressureEnabled(enabled);
 +        OutboundConnections connections = channelManagers.get(to);
 +        if (connections == null)
 +            connections = OutboundConnections.tryRegister(channelManagers, to, new OutboundConnectionSettings(to).withDefaults(ConnectionCategory.MESSAGING), backPressure.newState(to));
 +        return connections;
      }
  
 -    @Override
 -    public boolean isBackPressureEnabled()
 +    InboundMessageHandlers getInbound(InetAddressAndPort from)
      {
 -        return DatabaseDescriptor.backPressureEnabled();
 -    }
 +        InboundMessageHandlers handlers = messageHandlers.get(from);
 +        if (null != handlers)
 +            return handlers;
  
 -    public static IPartitioner globalPartitioner()
 -    {
 -        return StorageService.instance.getTokenMetadata().partitioner;
 +        return messageHandlers.computeIfAbsent(from, addr ->
 +            new InboundMessageHandlers(FBUtilities.getLocalAddressAndPort(),
 +                                       addr,
 +                                       DatabaseDescriptor.getInternodeApplicationReceiveQueueCapacityInBytes(),
 +                                       DatabaseDescriptor.getInternodeApplicationReceiveQueueReserveEndpointCapacityInBytes(),
 +                                       inboundGlobalReserveLimits, metrics, inboundSink)
 +        );
      }
  
 -    public static void validatePartitioner(Collection<? extends AbstractBounds<?>> allBounds)
 +    @VisibleForTesting
 +    boolean isConnected(InetAddressAndPort address, Message<?> messageOut)
      {
 -        for (AbstractBounds<?> bounds : allBounds)
 -            validatePartitioner(bounds);
 +        OutboundConnections pool = channelManagers.get(address);
 +        if (pool == null)
 +            return false;
 +        return pool.connectionFor(messageOut).isConnected();
      }
  
 -    public static void validatePartitioner(AbstractBounds<?> bounds)
 +    public void listen()
      {
 -        if (globalPartitioner() != bounds.left.getPartitioner())
 -            throw new AssertionError(String.format("Partitioner in bounds serialization. Expected %s, was %s.",
 -                                                   globalPartitioner().getClass().getName(),
 -                                                   bounds.left.getPartitioner().getClass().getName()));
 +        inboundSockets.open();
      }
  
 -    @VisibleForTesting
 -    public List<SocketThread> getSocketThreads()
 +    public void waitUntilListening() throws InterruptedException
      {
 -        return socketThreads;
 +        inboundSockets.open().await();
      }
  }
diff --cc src/java/org/apache/cassandra/net/SocketFactory.java
index 062c44b,0000000..da2d461
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/net/SocketFactory.java
+++ b/src/java/org/apache/cassandra/net/SocketFactory.java
@@@ -1,299 -1,0 +1,300 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.net;
 +
 +import java.io.IOException;
 +import java.net.ConnectException;
 +import java.net.InetSocketAddress;
 +import java.nio.channels.ClosedChannelException;
 +import java.nio.channels.spi.SelectorProvider;
 +import java.util.List;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ThreadFactory;
 +import java.util.concurrent.TimeoutException;
 +import javax.annotation.Nullable;
 +import javax.net.ssl.SSLEngine;
 +import javax.net.ssl.SSLParameters;
 +
 +import com.google.common.collect.ImmutableList;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import io.netty.bootstrap.Bootstrap;
 +import io.netty.bootstrap.ServerBootstrap;
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelFactory;
 +import io.netty.channel.DefaultSelectStrategyFactory;
 +import io.netty.channel.EventLoop;
 +import io.netty.channel.EventLoopGroup;
 +import io.netty.channel.ServerChannel;
 +import io.netty.channel.epoll.EpollChannelOption;
 +import io.netty.channel.epoll.EpollEventLoopGroup;
 +import io.netty.channel.epoll.EpollServerSocketChannel;
 +import io.netty.channel.epoll.EpollSocketChannel;
 +import io.netty.channel.nio.NioEventLoopGroup;
 +import io.netty.channel.socket.nio.NioServerSocketChannel;
 +import io.netty.channel.socket.nio.NioSocketChannel;
 +import io.netty.channel.unix.Errors;
 +import io.netty.handler.ssl.OpenSsl;
 +import io.netty.handler.ssl.SslContext;
 +import io.netty.handler.ssl.SslHandler;
 +import io.netty.util.concurrent.DefaultEventExecutorChooserFactory;
 +import io.netty.util.concurrent.DefaultThreadFactory;
 +import io.netty.util.concurrent.RejectedExecutionHandlers;
 +import io.netty.util.concurrent.ThreadPerTaskExecutor;
 +import io.netty.util.internal.logging.InternalLoggerFactory;
 +import io.netty.util.internal.logging.Slf4JLoggerFactory;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.Config;
 +import org.apache.cassandra.config.EncryptionOptions;
 +import org.apache.cassandra.locator.InetAddressAndPort;
++import org.apache.cassandra.security.SSLFactory;
 +import org.apache.cassandra.service.NativeTransportService;
 +import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static io.netty.channel.unix.Errors.ERRNO_ECONNRESET_NEGATIVE;
 +import static io.netty.channel.unix.Errors.ERROR_ECONNREFUSED_NEGATIVE;
 +import static java.util.concurrent.TimeUnit.SECONDS;
 +import static org.apache.cassandra.utils.Throwables.isCausedBy;
 +
 +/**
 + * A factory for building Netty {@link Channel}s. Channels here are setup with a pipeline to participate
 + * in the internode protocol handshake, either the inbound or outbound side as per the method invoked.
 + */
 +public final class SocketFactory
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SocketFactory.class);
 +
 +    private static final int EVENT_THREADS = Integer.getInteger(Config.PROPERTY_PREFIX + "internode-event-threads", FBUtilities.getAvailableProcessors());
 +
 +    /**
 +     * The default task queue used by {@code NioEventLoop} and {@code EpollEventLoop} is {@code MpscUnboundedArrayQueue},
 +     * provided by JCTools. While efficient, it has an undesirable quality for a queue backing an event loop: it is
 +     * not non-blocking, and can cause the event loop to busy-spin while waiting for a partially completed task
 +     * offer, if the producer thread has been suspended mid-offer.
 +     *
 +     * As it happens, however, we have an MPSC queue implementation that is perfectly fit for this purpose -
 +     * {@link ManyToOneConcurrentLinkedQueue}, that is non-blocking, and already used throughout the codebase,
 +     * that we can and do use here as well.
 +     */
 +    enum Provider
 +    {
 +        NIO
 +        {
 +            @Override
 +            NioEventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory)
 +            {
 +                return new NioEventLoopGroup(threadCount,
 +                                             new ThreadPerTaskExecutor(threadFactory),
 +                                             DefaultEventExecutorChooserFactory.INSTANCE,
 +                                             SelectorProvider.provider(),
 +                                             DefaultSelectStrategyFactory.INSTANCE,
 +                                             RejectedExecutionHandlers.reject(),
 +                                             capacity -> new ManyToOneConcurrentLinkedQueue<>());
 +            }
 +
 +            @Override
 +            ChannelFactory<NioSocketChannel> clientChannelFactory()
 +            {
 +                return NioSocketChannel::new;
 +            }
 +
 +            @Override
 +            ChannelFactory<NioServerSocketChannel> serverChannelFactory()
 +            {
 +                return NioServerSocketChannel::new;
 +            }
 +        },
 +        EPOLL
 +        {
 +            @Override
 +            EpollEventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory)
 +            {
 +                return new EpollEventLoopGroup(threadCount,
 +                                               new ThreadPerTaskExecutor(threadFactory),
 +                                               DefaultEventExecutorChooserFactory.INSTANCE,
 +                                               DefaultSelectStrategyFactory.INSTANCE,
 +                                               RejectedExecutionHandlers.reject(),
 +                                               capacity -> new ManyToOneConcurrentLinkedQueue<>());
 +            }
 +
 +            @Override
 +            ChannelFactory<EpollSocketChannel> clientChannelFactory()
 +            {
 +                return EpollSocketChannel::new;
 +            }
 +
 +            @Override
 +            ChannelFactory<EpollServerSocketChannel> serverChannelFactory()
 +            {
 +                return EpollServerSocketChannel::new;
 +            }
 +        };
 +
 +        EventLoopGroup makeEventLoopGroup(int threadCount, String threadNamePrefix)
 +        {
 +            logger.debug("using netty {} event loop for pool prefix {}", name(), threadNamePrefix);
 +            return makeEventLoopGroup(threadCount, new DefaultThreadFactory(threadNamePrefix, true));
 +        }
 +
 +        abstract EventLoopGroup makeEventLoopGroup(int threadCount, ThreadFactory threadFactory);
 +        abstract ChannelFactory<? extends Channel> clientChannelFactory();
 +        abstract ChannelFactory<? extends ServerChannel> serverChannelFactory();
 +
 +        static Provider optimalProvider()
 +        {
 +            return NativeTransportService.useEpoll() ? EPOLL : NIO;
 +        }
 +    }
 +
 +    /** a useful addition for debugging; simply set to true to get more data in your logs */
 +    static final boolean WIRETRACE = false;
 +    static
 +    {
 +        if (WIRETRACE)
 +            InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
 +    }
 +
 +    private final Provider provider;
 +    private final EventLoopGroup acceptGroup;
 +    private final EventLoopGroup defaultGroup;
 +    // we need a separate EventLoopGroup for outbound streaming because sendFile is blocking
 +    private final EventLoopGroup outboundStreamingGroup;
 +    final ExecutorService synchronousWorkExecutor = Executors.newCachedThreadPool(new NamedThreadFactory("Messaging-SynchronousWork"));
 +
 +    SocketFactory()
 +    {
 +        this(Provider.optimalProvider());
 +    }
 +
 +    SocketFactory(Provider provider)
 +    {
 +        this.provider = provider;
 +        this.acceptGroup = provider.makeEventLoopGroup(1, "Messaging-AcceptLoop");
 +        this.defaultGroup = provider.makeEventLoopGroup(EVENT_THREADS, NamedThreadFactory.globalPrefix() + "Messaging-EventLoop");
 +        this.outboundStreamingGroup = provider.makeEventLoopGroup(EVENT_THREADS, "Streaming-EventLoop");
 +    }
 +
 +    Bootstrap newClientBootstrap(EventLoop eventLoop, int tcpUserTimeoutInMS)
 +    {
 +        if (eventLoop == null)
 +            throw new IllegalArgumentException("must provide eventLoop");
 +
 +        Bootstrap bootstrap = new Bootstrap().group(eventLoop).channelFactory(provider.clientChannelFactory());
 +
 +        if (provider == Provider.EPOLL)
 +            bootstrap.option(EpollChannelOption.TCP_USER_TIMEOUT, tcpUserTimeoutInMS);
 +
 +        return bootstrap;
 +    }
 +
 +    ServerBootstrap newServerBootstrap()
 +    {
 +        return new ServerBootstrap().group(acceptGroup, defaultGroup).channelFactory(provider.serverChannelFactory());
 +    }
 +
 +    /**
 +     * Creates a new {@link SslHandler} from provided SslContext.
 +     * @param peer enables endpoint verification for remote address when not null
 +     */
 +    static SslHandler newSslHandler(Channel channel, SslContext sslContext, @Nullable InetSocketAddress peer)
 +    {
 +        if (peer == null)
 +            return sslContext.newHandler(channel.alloc());
 +
 +        logger.debug("Creating SSL handler for {}:{}", peer.getHostString(), peer.getPort());
 +        SslHandler sslHandler = sslContext.newHandler(channel.alloc(), peer.getHostString(), peer.getPort());
 +        SSLEngine engine = sslHandler.engine();
 +        SSLParameters sslParameters = engine.getSSLParameters();
 +        sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
 +        engine.setSSLParameters(sslParameters);
 +        return sslHandler;
 +    }
 +
 +    static String encryptionLogStatement(EncryptionOptions options)
 +    {
 +        if (options == null)
 +            return "disabled";
 +
-         String encryptionType = OpenSsl.isAvailable() ? "openssl" : "jdk";
++        String encryptionType = SSLFactory.openSslIsAvailable() ? "openssl" : "jdk";
 +        return "enabled (" + encryptionType + ')';
 +    }
 +
 +    EventLoopGroup defaultGroup()
 +    {
 +        return defaultGroup;
 +    }
 +
 +    public EventLoopGroup outboundStreamingGroup()
 +    {
 +        return outboundStreamingGroup;
 +    }
 +
 +    public void shutdownNow()
 +    {
 +        acceptGroup.shutdownGracefully(0, 2, SECONDS);
 +        defaultGroup.shutdownGracefully(0, 2, SECONDS);
 +        outboundStreamingGroup.shutdownGracefully(0, 2, SECONDS);
 +        synchronousWorkExecutor.shutdownNow();
 +    }
 +
 +    void awaitTerminationUntil(long deadlineNanos) throws InterruptedException, TimeoutException
 +    {
 +        List<ExecutorService> groups = ImmutableList.of(acceptGroup, defaultGroup, outboundStreamingGroup, synchronousWorkExecutor);
 +        ExecutorUtils.awaitTerminationUntil(deadlineNanos, groups);
 +    }
 +
 +    static boolean isConnectionReset(Throwable t)
 +    {
 +        if (t instanceof ClosedChannelException)
 +            return true;
 +        if (t instanceof ConnectException)
 +            return true;
 +        if (t instanceof Errors.NativeIoException)
 +        {
 +            int errorCode = ((Errors.NativeIoException) t).expectedErr();
 +            return errorCode == ERRNO_ECONNRESET_NEGATIVE || errorCode != ERROR_ECONNREFUSED_NEGATIVE;
 +        }
 +        return IOException.class == t.getClass() && ("Broken pipe".equals(t.getMessage()) || "Connection reset by peer".equals(t.getMessage()));
 +    }
 +
 +    static boolean isCausedByConnectionReset(Throwable t)
 +    {
 +        return isCausedBy(t, SocketFactory::isConnectionReset);
 +    }
 +
 +    static String channelId(InetAddressAndPort from, InetSocketAddress realFrom, InetAddressAndPort to, InetSocketAddress realTo, ConnectionType type, String id)
 +    {
 +        return addressId(from, realFrom) + "->" + addressId(to, realTo) + '-' + type + '-' + id;
 +    }
 +
 +    static String addressId(InetAddressAndPort address, InetSocketAddress realAddress)
 +    {
 +        String str = address.toString();
 +        if (!address.address.equals(realAddress.getAddress()) || address.port != realAddress.getPort())
 +            str += '(' + InetAddressAndPort.toString(realAddress.getAddress(), realAddress.getPort()) + ')';
 +        return str;
 +    }
 +
 +    static String channelId(InetAddressAndPort from, InetAddressAndPort to, ConnectionType type, String id)
 +    {
 +        return from + "->" + to + '-' + type + '-' + id;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/security/SSLFactory.java
index 2ccb126,7216e2c..f6bbcd0
--- a/src/java/org/apache/cassandra/security/SSLFactory.java
+++ b/src/java/org/apache/cassandra/security/SSLFactory.java
@@@ -45,98 -46,65 +45,119 @@@ import com.google.common.collect.Immuta
  import com.google.common.collect.ImmutableSet;
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import io.netty.handler.ssl.ClientAuth;
 +import io.netty.handler.ssl.OpenSsl;
 +import io.netty.handler.ssl.SslContext;
 +import io.netty.handler.ssl.SslContextBuilder;
 +import io.netty.handler.ssl.SslProvider;
 +import io.netty.handler.ssl.SupportedCipherSuiteFilter;
 +import io.netty.util.ReferenceCountUtil;
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
++import org.apache.cassandra.config.Config;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.EncryptionOptions;
  
  /**
 - * A Factory for providing and setting up Client and Server SSL wrapped
 - * Socket and ServerSocket
 + * A Factory for providing and setting up client {@link SSLSocket}s. Also provides
 + * methods for creating both JSSE {@link SSLContext} instances as well as netty {@link SslContext} instances.
 + * <p>
 + * Netty {@link SslContext} instances are expensive to create (as well as to destroy) and consume a lof of resources
 + * (especially direct memory), but instances can be reused across connections (assuming the SSL params are the same).
 + * Hence we cache created instances in {@link #cachedSslContexts}.
   */
  public final class SSLFactory
  {
      private static final Logger logger = LoggerFactory.getLogger(SSLFactory.class);
 -    private static boolean checkedExpiry = false;
  
 -    public static SSLServerSocket getServerSocket(EncryptionOptions options, InetAddress address, int port) throws IOException
 +    /**
 +     * Indicates if the process holds the inbound/listening end of the socket ({@link SocketType#SERVER})), or the
 +     * outbound side ({@link SocketType#CLIENT}).
 +     */
 +    public enum SocketType
      {
 -        SSLContext ctx = createSSLContext(options, true);
 -        SSLServerSocket serverSocket = (SSLServerSocket)ctx.getServerSocketFactory().createServerSocket();
 -        try
 -        {
 -            serverSocket.setReuseAddress(true);
 -            prepareSocket(serverSocket, options);
 -            serverSocket.bind(new InetSocketAddress(address, port), 500);
 -            return serverSocket;
 -        }
 -        catch (IllegalArgumentException | SecurityException | IOException e)
 -        {
 -            serverSocket.close();
 -            throw e;
 -        }
 +        SERVER, CLIENT
      }
  
 -    /** Create a socket and connect */
 -    public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException
 +    @VisibleForTesting
 +    static volatile boolean checkedExpiry = false;
 +
++    // Isolate calls to OpenSsl.isAvailable to allow in-jvm dtests to disable tcnative openssl
++    // support.  It creates a circular reference that prevents the instance class loader from being
++    // garbage collected.
++    static private final boolean openSslIsAvailable;
++    static
+     {
 -        SSLContext ctx = createSSLContext(options, true);
 -        SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort);
 -        try
++        if (Boolean.getBoolean(Config.PROPERTY_PREFIX + "disable_tcactive_openssl"))
+         {
 -            prepareSocket(socket, options);
 -            return socket;
++            openSslIsAvailable = false;
+         }
 -        catch (IllegalArgumentException e)
++        else
+         {
 -            socket.close();
 -            throw e;
++            openSslIsAvailable = OpenSsl.isAvailable();
+         }
+     }
++    public static boolean openSslIsAvailable()
++    {
++        return openSslIsAvailable;
++    }
++
 +    /**
 +     * Cached references of SSL Contexts
 +     */
 +    private static final ConcurrentHashMap<CacheKey, SslContext> cachedSslContexts = new ConcurrentHashMap<>();
 +
 +    /**
 +     * List of files that trigger hot reloading of SSL certificates
 +     */
 +    private static volatile List<HotReloadableFile> hotReloadableFiles = ImmutableList.of();
 +
 +    /**
 +     * Default initial delay for hot reloading
 +     */
 +    public static final int DEFAULT_HOT_RELOAD_INITIAL_DELAY_SEC = 600;
 +
 +    /**
 +     * Default periodic check delay for hot reloading
 +     */
 +    public static final int DEFAULT_HOT_RELOAD_PERIOD_SEC = 600;
  
 -    /** Create a socket and connect, using any local address */
 -    public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port) throws IOException
 +    /**
 +     * State variable to maintain initialization invariant
 +     */
 +    private static boolean isHotReloadingInitialized = false;
 +
 +    /**
 +     * Helper class for hot reloading SSL Contexts
 +     */
 +    private static class HotReloadableFile
      {
 -        SSLContext ctx = createSSLContext(options, true);
 -        SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port);
 -        try
 +        private final File file;
 +        private volatile long lastModTime;
 +
 +        HotReloadableFile(String path)
          {
 -            prepareSocket(socket, options);
 -            return socket;
 +            file = new File(path);
 +            lastModTime = file.lastModified();
          }
 -        catch (IllegalArgumentException e)
 +
 +        boolean shouldReload()
          {
 -            socket.close();
 -            throw e;
 +            long curModTime = file.lastModified();
 +            boolean result = curModTime != lastModTime;
 +            lastModTime = curModTime;
 +            return result;
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return "HotReloadableFile{" +
 +                       "file=" + file +
 +                       ", lastModTime=" + lastModTime +
 +                       '}';
          }
      }
  
@@@ -226,221 -222,4 +247,221 @@@
          }
          return ret;
      }
 +
 +    /**
 +     * get a netty {@link SslContext} instance
 +     */
 +    public static SslContext getOrCreateSslContext(EncryptionOptions options, boolean buildTruststore,
 +                                                   SocketType socketType) throws IOException
 +    {
-         return getOrCreateSslContext(options, buildTruststore, socketType, OpenSsl.isAvailable());
++        return getOrCreateSslContext(options, buildTruststore, socketType, openSslIsAvailable());
 +    }
 +
 +    /**
 +     * Get a netty {@link SslContext} instance.
 +     */
 +    @VisibleForTesting
 +    static SslContext getOrCreateSslContext(EncryptionOptions options,
 +                                            boolean buildTruststore,
 +                                            SocketType socketType,
 +                                            boolean useOpenSsl) throws IOException
 +    {
 +        CacheKey key = new CacheKey(options, socketType, useOpenSsl);
 +        SslContext sslContext;
 +
 +        sslContext = cachedSslContexts.get(key);
 +        if (sslContext != null)
 +            return sslContext;
 +
 +        sslContext = createNettySslContext(options, buildTruststore, socketType, useOpenSsl);
 +
 +        SslContext previous = cachedSslContexts.putIfAbsent(key, sslContext);
 +        if (previous == null)
 +            return sslContext;
 +
 +        ReferenceCountUtil.release(sslContext);
 +        return previous;
 +    }
 +
 +    /**
 +     * Create a Netty {@link SslContext}
 +     */
 +    static SslContext createNettySslContext(EncryptionOptions options, boolean buildTruststore,
 +                                            SocketType socketType, boolean useOpenSsl) throws IOException
 +    {
 +        /*
 +            There is a case where the netty/openssl combo might not support using KeyManagerFactory. specifically,
 +            I've seen this with the netty-tcnative dynamic openssl implementation. using the netty-tcnative static-boringssl
 +            works fine with KeyManagerFactory. If we want to support all of the netty-tcnative options, we would need
 +            to fall back to passing in a file reference for both a x509 and PKCS#8 private key file in PEM format (see
 +            {@link SslContextBuilder#forServer(File, File, String)}). However, we are not supporting that now to keep
 +            the config/yaml API simple.
 +         */
 +        KeyManagerFactory kmf = buildKeyManagerFactory(options);
 +        SslContextBuilder builder;
 +        if (socketType == SocketType.SERVER)
 +        {
 +            builder = SslContextBuilder.forServer(kmf);
 +            builder.clientAuth(options.require_client_auth ? ClientAuth.REQUIRE : ClientAuth.NONE);
 +        }
 +        else
 +        {
 +            builder = SslContextBuilder.forClient().keyManager(kmf);
 +        }
 +
 +        builder.sslProvider(useOpenSsl ? SslProvider.OPENSSL : SslProvider.JDK);
 +
 +        // only set the cipher suites if the opertor has explicity configured values for it; else, use the default
 +        // for each ssl implemention (jdk or openssl)
 +        if (options.cipher_suites != null && !options.cipher_suites.isEmpty())
 +            builder.ciphers(options.cipher_suites, SupportedCipherSuiteFilter.INSTANCE);
 +
 +        if (buildTruststore)
 +            builder.trustManager(buildTrustManagerFactory(options));
 +
 +        return builder.build();
 +    }
 +
 +    /**
 +     * Performs a lightweight check whether the certificate files have been refreshed.
 +     *
 +     * @throws IllegalStateException if {@link #initHotReloading(EncryptionOptions.ServerEncryptionOptions, EncryptionOptions, boolean)}
 +     *                               is not called first
 +     */
 +    public static void checkCertFilesForHotReloading(EncryptionOptions.ServerEncryptionOptions serverOpts,
 +                                                     EncryptionOptions clientOpts)
 +    {
 +        if (!isHotReloadingInitialized)
 +            throw new IllegalStateException("Hot reloading functionality has not been initialized.");
 +
 +        logger.debug("Checking whether certificates have been updated {}", hotReloadableFiles);
 +
 +        if (hotReloadableFiles.stream().anyMatch(HotReloadableFile::shouldReload))
 +        {
 +            logger.info("SSL certificates have been updated. Reseting the ssl contexts for new connections.");
 +            try
 +            {
 +                validateSslCerts(serverOpts, clientOpts);
 +                cachedSslContexts.clear();
 +            }
 +            catch(Exception e)
 +            {
 +                logger.error("Failed to hot reload the SSL Certificates! Please check the certificate files.", e);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Determines whether to hot reload certificates and schedules a periodic task for it.
 +     *
 +     * @param serverOpts Server encryption options (Internode)
 +     * @param clientOpts Client encryption options (Native Protocol)
 +     */
 +    public static synchronized void initHotReloading(EncryptionOptions.ServerEncryptionOptions serverOpts,
 +                                                     EncryptionOptions clientOpts,
 +                                                     boolean force) throws IOException
 +    {
 +        if (isHotReloadingInitialized && !force)
 +            return;
 +
 +        logger.debug("Initializing hot reloading SSLContext");
 +
 +        validateSslCerts(serverOpts, clientOpts);
 +
 +        List<HotReloadableFile> fileList = new ArrayList<>();
 +
 +        if (serverOpts != null && serverOpts.enabled)
 +        {
 +            fileList.add(new HotReloadableFile(serverOpts.keystore));
 +            fileList.add(new HotReloadableFile(serverOpts.truststore));
 +        }
 +
 +        if (clientOpts != null && clientOpts.enabled)
 +        {
 +            fileList.add(new HotReloadableFile(clientOpts.keystore));
 +            fileList.add(new HotReloadableFile(clientOpts.truststore));
 +        }
 +
 +        hotReloadableFiles = ImmutableList.copyOf(fileList);
 +
 +        if (!isHotReloadingInitialized)
 +        {
 +            ScheduledExecutors.scheduledTasks
 +                .scheduleWithFixedDelay(() -> checkCertFilesForHotReloading(
 +                                                DatabaseDescriptor.getInternodeMessagingEncyptionOptions(),
 +                                                DatabaseDescriptor.getNativeProtocolEncryptionOptions()),
 +                                        DEFAULT_HOT_RELOAD_INITIAL_DELAY_SEC,
 +                                        DEFAULT_HOT_RELOAD_PERIOD_SEC, TimeUnit.SECONDS);
 +        }
 +
 +        isHotReloadingInitialized = true;
 +    }
 +
 +
 +    /**
 +     * Sanity checks all certificates to ensure we can actually load them
 +     */
 +    public static void validateSslCerts(EncryptionOptions.ServerEncryptionOptions serverOpts, EncryptionOptions clientOpts) throws IOException
 +    {
 +        try
 +        {
 +            // Ensure we're able to create both server & client SslContexts
 +            if (serverOpts != null && serverOpts.enabled)
 +            {
-                 createNettySslContext(serverOpts, true, SocketType.SERVER, OpenSsl.isAvailable());
-                 createNettySslContext(serverOpts, true, SocketType.CLIENT, OpenSsl.isAvailable());
++                createNettySslContext(serverOpts, true, SocketType.SERVER, openSslIsAvailable());
++                createNettySslContext(serverOpts, true, SocketType.CLIENT, openSslIsAvailable());
 +            }
 +        }
 +        catch (Exception e)
 +        {
 +            throw new IOException("Failed to create SSL context using server_encryption_options!", e);
 +        }
 +
 +        try
 +        {
 +            // Ensure we're able to create both server & client SslContexts
 +            if (clientOpts != null && clientOpts.enabled)
 +            {
-                 createNettySslContext(clientOpts, clientOpts.require_client_auth, SocketType.SERVER, OpenSsl.isAvailable());
-                 createNettySslContext(clientOpts, clientOpts.require_client_auth, SocketType.CLIENT, OpenSsl.isAvailable());
++                createNettySslContext(clientOpts, clientOpts.require_client_auth, SocketType.SERVER, openSslIsAvailable());
++                createNettySslContext(clientOpts, clientOpts.require_client_auth, SocketType.CLIENT, openSslIsAvailable());
 +            }
 +        }
 +        catch (Exception e)
 +        {
 +            throw new IOException("Failed to create SSL context using client_encryption_options!", e);
 +        }
 +    }
 +
 +    static class CacheKey
 +    {
 +        private final EncryptionOptions encryptionOptions;
 +        private final SocketType socketType;
 +        private final boolean useOpenSSL;
 +
 +        public CacheKey(EncryptionOptions encryptionOptions, SocketType socketType, boolean useOpenSSL)
 +        {
 +            this.encryptionOptions = encryptionOptions;
 +            this.socketType = socketType;
 +            this.useOpenSSL = useOpenSSL;
 +        }
 +
 +        public boolean equals(Object o)
 +        {
 +            if (this == o) return true;
 +            if (o == null || getClass() != o.getClass()) return false;
 +            CacheKey cacheKey = (CacheKey) o;
 +            return (socketType == cacheKey.socketType &&
 +                    useOpenSSL == cacheKey.useOpenSSL &&
 +                    Objects.equals(encryptionOptions, cacheKey.encryptionOptions));
 +        }
 +
 +        public int hashCode()
 +        {
 +            int result = 0;
 +            result += 31 * socketType.hashCode();
 +            result += 31 * encryptionOptions.hashCode();
 +            result += 31 * Boolean.hashCode(useOpenSSL);
 +            return result;
 +        }
 +    }
  }
diff --cc src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
index 6901968,1334611..1c6b183
--- a/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
+++ b/src/java/org/apache/cassandra/service/PendingRangeCalculatorService.java
@@@ -123,9 -118,8 +123,8 @@@ public class PendingRangeCalculatorServ
      }
  
      @VisibleForTesting
-     public void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException
 -    public void shutdownExecutor(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
++    public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         executor.shutdown();
-         ExecutorUtils.awaitTermination(timeout, units, executor);
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, executor);
      }
  }
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index e5ae30b,000c2fb..ae83e92
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -100,15 -93,11 +100,16 @@@ import org.apache.cassandra.utils.*
  import org.apache.cassandra.utils.logging.LoggingSupportFactory;
  import org.apache.cassandra.utils.progress.ProgressEvent;
  import org.apache.cassandra.utils.progress.ProgressEventType;
 +import org.apache.cassandra.utils.progress.ProgressListener;
 +import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor;
  import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
 -import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;
  
 +import static com.google.common.collect.Iterables.transform;
 +import static com.google.common.collect.Iterables.tryFind;
  import static java.util.Arrays.asList;
+ import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
 +import static java.util.concurrent.TimeUnit.NANOSECONDS;
  import static java.util.stream.Collectors.toList;
  import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
  import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
@@@ -5394,59 -5296,12 +5404,68 @@@ public class StorageService extends Not
          logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB);
      }
  
 +    @Override
 +    public void clearConnectionHistory()
 +    {
 +        daemon.clearConnectionHistory();
 +        logger.info("Cleared connection history");
 +    }
 +    public void disableAuditLog()
 +    {
 +        AuditLogManager.getInstance().disableAuditLog();
 +        logger.info("Auditlog is disabled");
 +    }
 +
 +    public void enableAuditLog(String loggerName, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories,
 +                               String includedUsers, String excludedUsers) throws ConfigurationException, IllegalStateException
 +    {
 +        loggerName = loggerName != null ? loggerName : DatabaseDescriptor.getAuditLoggingOptions().logger;
 +
 +        Preconditions.checkNotNull(loggerName, "cassandra.yaml did not have logger in audit_logging_option and not set as parameter");
 +        Preconditions.checkState(FBUtilities.isAuditLoggerClassExists(loggerName), "Unable to find AuditLogger class: "+loggerName);
 +
 +        AuditLogOptions auditLogOptions = new AuditLogOptions();
 +        auditLogOptions.enabled = true;
 +        auditLogOptions.logger = loggerName;
 +        auditLogOptions.included_keyspaces = includedKeyspaces != null ? includedKeyspaces : DatabaseDescriptor.getAuditLoggingOptions().included_keyspaces;
 +        auditLogOptions.excluded_keyspaces = excludedKeyspaces != null ? excludedKeyspaces : DatabaseDescriptor.getAuditLoggingOptions().excluded_keyspaces;
 +        auditLogOptions.included_categories = includedCategories != null ? includedCategories : DatabaseDescriptor.getAuditLoggingOptions().included_categories;
 +        auditLogOptions.excluded_categories = excludedCategories != null ? excludedCategories : DatabaseDescriptor.getAuditLoggingOptions().excluded_categories;
 +        auditLogOptions.included_users = includedUsers != null ? includedUsers : DatabaseDescriptor.getAuditLoggingOptions().included_users;
 +        auditLogOptions.excluded_users = excludedUsers != null ? excludedUsers : DatabaseDescriptor.getAuditLoggingOptions().excluded_users;
 +
 +        AuditLogManager.getInstance().enableAuditLog(auditLogOptions);
 +
 +        logger.info("AuditLog is enabled with logger: [{}], included_keyspaces: [{}], excluded_keyspaces: [{}], " +
 +                    "included_categories: [{}], excluded_categories: [{}], included_users: [{}], "
 +                    + "excluded_users: [{}], archive_command: [{}]", loggerName, auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces,
 +                    auditLogOptions.included_categories, auditLogOptions.excluded_categories, auditLogOptions.included_users, auditLogOptions.excluded_users,
 +                    auditLogOptions.archive_command);
 +
 +    }
 +
 +    public boolean isAuditLogEnabled()
 +    {
 +        return AuditLogManager.getInstance().isAuditingEnabled();
 +    }
 +
 +    public String getCorruptedTombstoneStrategy()
 +    {
 +        return DatabaseDescriptor.getCorruptedTombstoneStrategy().toString();
 +    }
 +
 +    public void setCorruptedTombstoneStrategy(String strategy)
 +    {
 +        DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.valueOf(strategy));
 +        logger.info("Setting corrupted tombstone strategy to {}", strategy);
 +    }
++
+     @VisibleForTesting
+     public void shutdownServer()
+     {
+         if (drainOnShutdown != null)
+         {
+             Runtime.getRuntime().removeShutdownHook(drainOnShutdown);
+         }
+     }
  }
diff --cc src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 7eada28,9a8f4c2..6d757b6
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@@ -17,13 -17,21 +17,15 @@@
   */
  package org.apache.cassandra.streaming;
  
 -import java.net.InetAddress;
  import java.util.*;
 -import java.util.concurrent.ExecutorService;
 -import java.util.concurrent.TimeUnit;
 -import java.util.concurrent.TimeoutException;
  
+ import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.ImmutableList;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 -import org.apache.cassandra.utils.ExecutorUtils;
 -import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +
+ 
  /**
   * {@link StreamCoordinator} is a helper class that abstracts away maintaining multiple
   * StreamSession and ProgressInfo instances per peer.
@@@ -340,5 -346,12 +342,11 @@@ public class StreamCoordinato
          {
              return sessionInfos.values();
          }
 -    }
+ 
 -    @VisibleForTesting
 -    public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 -    {
 -        ExecutorUtils.shutdownAndWait(timeout, unit, streamExecutor);
++        @VisibleForTesting
++        public void shutdown()
++        {
++            streamSessions.values().forEach(ss -> ss.sessionFailed());
++        }
      }
 -
  }
diff --cc src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 31c60be,5388dd6..87d6ce0
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@@ -17,17 -17,42 +17,23 @@@
   */
  package org.apache.cassandra.streaming;
  
 -import java.util.ArrayList;
 -import java.util.Collection;
 -import java.util.List;
 -import java.util.Set;
 -import java.util.UUID;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
++import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
  
 -import com.google.common.collect.Iterables;
 -
 -import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
++import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 -
  import org.apache.cassandra.concurrent.NamedThreadFactory;
 -import org.apache.cassandra.config.Schema;
  import org.apache.cassandra.db.ColumnFamilyStore;
 -import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.Mutation;
 -import org.apache.cassandra.db.compaction.OperationType;
 -import org.apache.cassandra.db.filter.ColumnFilter;
 -import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 -import org.apache.cassandra.db.partitions.PartitionUpdate;
 -import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 -import org.apache.cassandra.db.view.View;
 -import org.apache.cassandra.dht.Bounds;
 -import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.ISSTableScanner;
 -import org.apache.cassandra.io.sstable.SSTable;
 -import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.TableId;
  import org.apache.cassandra.utils.JVMStabilityInspector;
 -import org.apache.cassandra.utils.Pair;
 -import org.apache.cassandra.utils.Throwables;
 -import org.apache.cassandra.utils.concurrent.Refs;
 +
++import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
+ 
  /**
   * Task that manages receiving files for the session for certain ColumnFamily.
   */
@@@ -156,6 -306,17 +162,13 @@@ public class StreamReceiveTask extends 
              return;
  
          done = true;
 -        abortTransaction();
 -        sstables.clear();
 -    }
 -
 -    private synchronized void abortTransaction()
 -    {
 -        txn.abort();
 +        receiver.abort();
      }
+ 
 -    private synchronized void finishTransaction()
++    @VisibleForTesting
++    public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+     {
 -        txn.finish();
++        shutdown(executor);
++        awaitTermination(timeout, unit, executor);
+     }
  }
diff --cc src/java/org/apache/cassandra/streaming/StreamTransferTask.java
index 802188a,4f313c3..ba05acd
--- a/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamTransferTask.java
@@@ -24,18 -23,16 +24,21 @@@ import java.util.concurrent.TimeUnit
  import java.util.concurrent.atomic.AtomicInteger;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
  import com.google.common.base.Throwables;
  
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
  import org.apache.cassandra.concurrent.NamedThreadFactory;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
 -import org.apache.cassandra.utils.Pair;
 -import org.apache.cassandra.utils.concurrent.Ref;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
 +
++import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
+ 
  /**
 - * StreamTransferTask sends sections of SSTable files in certain ColumnFamily.
 + * StreamTransferTask sends streams for a given table
   */
  public class StreamTransferTask extends StreamTask
  {
@@@ -178,4 -173,4 +181,11 @@@
          assert prev == null;
          return future;
      }
++
++    @VisibleForTesting
++    public static void shutdownAndWait(long timeout, TimeUnit units) throws InterruptedException, TimeoutException
++    {
++        shutdown(timeoutExecutor);
++        awaitTermination(timeout, units, timeoutExecutor);
++    }
  }
diff --cc src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
index 79f30f3,0000000..2114c72
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
+++ b/src/java/org/apache/cassandra/streaming/async/StreamingInboundHandler.java
@@@ -1,269 -1,0 +1,303 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.streaming.async;
 +
 +import java.io.EOFException;
 +import java.io.IOException;
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.List;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.function.Function;
 +import javax.annotation.Nullable;
 +
 +import com.google.common.annotations.VisibleForTesting;
++import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.Uninterruptibles;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import io.netty.buffer.ByteBuf;
 +import io.netty.channel.Channel;
 +import io.netty.channel.ChannelHandlerContext;
 +import io.netty.channel.ChannelInboundHandlerAdapter;
 +import io.netty.util.ReferenceCountUtil;
 +import io.netty.util.concurrent.FastThreadLocalThread;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.AsyncStreamingInputPlus;
 +import org.apache.cassandra.net.AsyncStreamingInputPlus.InputTimeoutException;
 +import org.apache.cassandra.streaming.StreamManager;
 +import org.apache.cassandra.streaming.StreamReceiveException;
 +import org.apache.cassandra.streaming.StreamResultFuture;
 +import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.streaming.messages.IncomingStreamMessage;
 +import org.apache.cassandra.streaming.messages.KeepAliveMessage;
 +import org.apache.cassandra.streaming.messages.StreamInitMessage;
 +import org.apache.cassandra.streaming.messages.StreamMessage;
 +import org.apache.cassandra.streaming.messages.StreamMessageHeader;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +
 +import static org.apache.cassandra.streaming.async.NettyStreamingMessageSender.createLogTag;
 +
 +/**
 + * Handles the inbound side of streaming messages and stream data. From the incoming data, we derserialize the message
 + * including the actual stream data itself. Because the reading and deserialization of streams is a blocking affair,
 + * we can't block the netty event loop. Thus we have a background thread perform all the blocking deserialization.
 + */
 +public class StreamingInboundHandler extends ChannelInboundHandlerAdapter
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(StreamingInboundHandler.class);
 +    private static final Function<SessionIdentifier, StreamSession> DEFAULT_SESSION_PROVIDER = sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex);
- 
++    private static volatile boolean trackInboundHandlers = false;
++    private static final List<StreamingInboundHandler> inboundHandlers = Collections.synchronizedList(new ArrayList<>());
 +    private final InetAddressAndPort remoteAddress;
 +    private final int protocolVersion;
 +
 +    private final StreamSession session;
 +
 +    /**
 +     * A collection of {@link ByteBuf}s that are yet to be processed. Incoming buffers are first dropped into this
 +     * structure, and then consumed.
 +     * <p>
 +     * For thread safety, this structure's resources are released on the consuming thread
 +     * (via {@link AsyncStreamingInputPlus#close()},
 +     * but the producing side calls {@link AsyncStreamingInputPlus#requestClosure()} to notify the input that is should close.
 +     */
 +    private AsyncStreamingInputPlus buffers;
 +
 +    private volatile boolean closed;
 +
 +    public StreamingInboundHandler(InetAddressAndPort remoteAddress, int protocolVersion, @Nullable StreamSession session)
 +    {
 +        this.remoteAddress = remoteAddress;
 +        this.protocolVersion = protocolVersion;
 +        this.session = session;
++        if (trackInboundHandlers)
++            inboundHandlers.add(this);
 +    }
 +
 +    @Override
 +    @SuppressWarnings("resource")
 +    public void handlerAdded(ChannelHandlerContext ctx)
 +    {
 +        buffers = new AsyncStreamingInputPlus(ctx.channel());
 +        Thread blockingIOThread = new FastThreadLocalThread(new StreamDeserializingTask(DEFAULT_SESSION_PROVIDER, session, ctx.channel()),
 +                                                            String.format("Stream-Deserializer-%s-%s", remoteAddress.toString(), ctx.channel().id()));
 +        blockingIOThread.setDaemon(true);
 +        blockingIOThread.start();
 +    }
 +
 +    @Override
 +    public void channelRead(ChannelHandlerContext ctx, Object message)
 +    {
 +        if (closed || !(message instanceof ByteBuf) || !buffers.append((ByteBuf) message))
 +            ReferenceCountUtil.release(message);
 +    }
 +
 +    @Override
 +    public void channelInactive(ChannelHandlerContext ctx)
 +    {
 +        close();
 +        ctx.fireChannelInactive();
 +    }
 +
 +    void close()
 +    {
 +        closed = true;
 +        buffers.requestClosure();
++        if (trackInboundHandlers)
++            inboundHandlers.remove(this);
 +    }
 +
 +    @Override
 +    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
 +    {
 +        if (cause instanceof IOException)
 +            logger.trace("connection problem while streaming", cause);
 +        else
 +            logger.warn("exception occurred while in processing streaming data", cause);
 +        close();
 +    }
 +
 +    /**
 +     * For testing only!!
 +     */
 +    void setPendingBuffers(AsyncStreamingInputPlus bufChannel)
 +    {
 +        this.buffers = bufChannel;
 +    }
 +
 +    /**
 +     * The task that performs the actual deserialization.
 +     */
 +    class StreamDeserializingTask implements Runnable
 +    {
 +        private final Function<SessionIdentifier, StreamSession> sessionProvider;
 +        private final Channel channel;
 +
 +        @VisibleForTesting
 +        StreamSession session;
 +
 +        StreamDeserializingTask(Function<SessionIdentifier, StreamSession> sessionProvider, StreamSession session, Channel channel)
 +        {
 +            this.sessionProvider = sessionProvider;
 +            this.session = session;
 +            this.channel = channel;
 +        }
 +
 +        @Override
 +        public void run()
 +        {
 +            try
 +            {
 +                while (true)
 +                {
 +                    buffers.maybeIssueRead();
 +
 +                    // do a check of available bytes and possibly sleep some amount of time (then continue).
 +                    // this way we can break out of run() sanely or we end up blocking indefintely in StreamMessage.deserialize()
 +                    while (buffers.isEmpty())
 +                    {
 +                        if (closed)
 +                            return;
 +
 +                        Uninterruptibles.sleepUninterruptibly(400, TimeUnit.MILLISECONDS);
 +                    }
 +
 +                    StreamMessage message = StreamMessage.deserialize(buffers, protocolVersion, null);
 +
 +                    // keep-alives don't necessarily need to be tied to a session (they could be arrive before or after
 +                    // wrt session lifecycle, due to races), just log that we received the message and carry on
 +                    if (message instanceof KeepAliveMessage)
 +                    {
 +                        if (logger.isDebugEnabled())
 +                            logger.debug("{} Received {}", createLogTag(session, channel), message);
 +                        continue;
 +                    }
 +
 +                    if (session == null)
 +                        session = deriveSession(message);
 +
 +                    if (logger.isDebugEnabled())
 +                        logger.debug("{} Received {}", createLogTag(session, channel), message);
 +
 +                    session.messageReceived(message);
 +                }
 +            }
 +            catch (InputTimeoutException | EOFException e)
 +            {
 +                // ignore
 +            }
 +            catch (Throwable t)
 +            {
 +                JVMStabilityInspector.inspectThrowable(t);
 +                if (session != null)
 +                {
 +                    session.onError(t);
 +                }
 +                else if (t instanceof StreamReceiveException)
 +                {
 +                    ((StreamReceiveException)t).session.onError(t);
 +                }
 +                else
 +                {
 +                    logger.error("{} stream operation from {} failed", createLogTag(session, channel), remoteAddress, t);
 +                }
 +            }
 +            finally
 +            {
 +                channel.close();
 +                closed = true;
 +
 +                if (buffers != null)
++                {
++                    // request closure again as the original request could have raced with receiving a
++                    // message and been consumed in the message receive loop above.  Otherweise
++                    // buffers could hang indefinitely on the queue.poll.
++                    buffers.requestClosure();
 +                    buffers.close();
++                }
 +            }
 +        }
 +
 +        StreamSession deriveSession(StreamMessage message)
 +        {
 +            StreamSession streamSession = null;
 +            // StreamInitMessage starts a new channel, and IncomingStreamMessage potentially, as well.
 +            // IncomingStreamMessage needs a session to be established a priori, though
 +            if (message instanceof StreamInitMessage)
 +            {
 +                assert session == null : "initiator of stream session received a StreamInitMessage";
 +                StreamInitMessage init = (StreamInitMessage) message;
 +                StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, channel, init.pendingRepair, init.previewKind);
 +                streamSession = sessionProvider.apply(new SessionIdentifier(init.from, init.planId, init.sessionIndex));
 +            }
 +            else if (message instanceof IncomingStreamMessage)
 +            {
 +                // TODO: it'd be great to check if the session actually exists before slurping in the entire stream,
 +                // but that's a refactoring for another day
 +                StreamMessageHeader header = ((IncomingStreamMessage) message).header;
 +                streamSession = sessionProvider.apply(new SessionIdentifier(header.sender, header.planId, header.sessionIndex));
 +            }
 +
 +            if (streamSession == null)
 +                throw new IllegalStateException(createLogTag(null, channel) + " no session found for message " + message);
 +
 +            streamSession.attach(channel);
 +            return streamSession;
 +        }
 +    }
 +
 +    /**
 +     * A simple struct to wrap the data points required to lookup a {@link StreamSession}
 +     */
 +    static class SessionIdentifier
 +    {
 +        final InetAddressAndPort from;
 +        final UUID planId;
 +        final int sessionIndex;
 +
 +        SessionIdentifier(InetAddressAndPort from, UUID planId, int sessionIndex)
 +        {
 +            this.from = from;
 +            this.planId = planId;
 +            this.sessionIndex = sessionIndex;
 +        }
 +    }
++
++    /** Shutdown for in-JVM tests. For any other usage, tracking of active inbound streaming handlers
++     *  should be revisted first and in-JVM shutdown refactored with it.
++     *  This does not prevent new inbound handlers being added after shutdown, nor is not thread-safe
++     *  around new inbound handlers being opened during shutdown.
++      */
++    @VisibleForTesting
++    public static void shutdown()
++    {
++        assert trackInboundHandlers == true : "in-JVM tests required tracking of inbound streaming handlers";
++
++        inboundHandlers.forEach(StreamingInboundHandler::close);
++        inboundHandlers.clear();
++    }
++
++    public static void trackInboundHandlers()
++    {
++        trackInboundHandlers = true;
++    }
 +}
diff --cc src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 7986a77,5b6c3d6..a373347
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@@ -49,8 -50,8 +50,7 @@@ import org.apache.cassandra.utils.NoSpa
  import org.apache.cassandra.utils.Pair;
  import org.cliffc.high_scale_lib.NonBlockingHashMap;
  
- import static com.google.common.collect.ImmutableList.*;
  import static java.util.Collections.emptyList;
 -import org.apache.cassandra.concurrent.InfiniteLoopExecutor.InterruptibleRunnable;
  
  import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
  import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
diff --cc src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index fdcd948,bd17f78..5ef023f
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@@ -67,13 -68,13 +68,12 @@@ public abstract class MemtablePoo
          return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner);
      }
  
-     public void shutdown(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
+     @VisibleForTesting
+     public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         cleaner.shutdownNow();
-         if (!cleaner.awaitTermination(timeout, unit))
-             throw new TimeoutException();
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
      }
  
 -
      public abstract MemtableAllocator newAllocator();
  
      /**
diff --cc test/conf/logback-dtest.xml
index 13a729d,4282fee..370e1e5
--- a/test/conf/logback-dtest.xml
+++ b/test/conf/logback-dtest.xml
@@@ -58,8 -58,17 +58,8 @@@
      </filter>
    </appender>
  
-   <appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
+   <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
      <encoder>
 -      <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
 -    </encoder>
 -    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
 -      <level>WARN</level>
 -    </filter>
 -  </appender>
 -
 -  <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
 -    <encoder>
        <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern>
      </encoder>
      <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
diff --cc test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
index 70491e6,f7c8094..426aa5e
--- a/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IMessageFilters.java
@@@ -18,12 -18,8 +18,9 @@@
  
  package org.apache.cassandra.distributed.api;
  
- import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.Verb;
  
- import java.util.function.BiConsumer;
- 
  public interface IMessageFilters
  {
      public interface Filter
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index d1d4dd4,784eb3f..9294bfe
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -27,13 -31,9 +27,12 @@@ import java.util.UUID
  import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Future;
 +import java.util.concurrent.TimeoutException;
  import java.util.function.BiConsumer;
 -import java.util.function.Function;
 +import java.util.function.BiPredicate;
 +
- import org.slf4j.LoggerFactory;
++import io.netty.util.concurrent.GlobalEventExecutor;
  
- import ch.qos.logback.classic.LoggerContext;
  import org.apache.cassandra.batchlog.BatchlogManager;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.SharedExecutorPool;
@@@ -76,9 -81,11 +76,12 @@@ import org.apache.cassandra.service.Cli
  import org.apache.cassandra.service.PendingRangeCalculatorService;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.streaming.StreamCoordinator;
 -import org.apache.cassandra.streaming.StreamSession;
++import org.apache.cassandra.streaming.async.StreamingInboundHandler;
++import org.apache.cassandra.streaming.StreamReceiveTask;
++import org.apache.cassandra.streaming.StreamTransferTask;
  import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
  import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.Ref;
  import org.apache.cassandra.utils.memory.BufferPool;
@@@ -98,10 -105,13 +101,18 @@@ public class Instance extends IsolatedE
          super("node" + config.num(), classLoader);
          this.config = config;
          InstanceIDDefiner.setInstanceId(config.num());
 -        FBUtilities.setBroadcastInetAddress(config.broadcastAddressAndPort().address);
 +        FBUtilities.setBroadcastInetAddressAndPort(config.broadcastAddressAndPort());
+         // Set the config at instance creation, possibly before startup() has run on all other instances.
+         // setMessagingVersions below will call runOnInstance which will instantiate
+         // the MessagingService and dependencies preventing later changes to network parameters.
+         Config.setOverrideLoadConfig(() -> loadConfig(config));
++
++        // Enable streaming inbound handler tracking so they can be closed properly without leaking
++        // the blocking IO thread.
++        StreamingInboundHandler.trackInboundHandlers();
      }
  
 +    @Override
      public IInstanceConfig config()
      {
          return config;
@@@ -245,24 -317,15 +261,23 @@@
          sync(() -> {
              try
              {
 +                if (config.has(GOSSIP))
 +                {
 +                    // TODO: hacky
 +                    System.setProperty("cassandra.ring_delay_ms", "5000");
 +                    System.setProperty("cassandra.consistent.rangemovement", "false");
 +                    System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true");
 +                }
 +
                  mkdirs();
-                 Config.setOverrideLoadConfig(() -> loadConfig(config));
-                 DatabaseDescriptor.daemonInitialization();
  
+                 DatabaseDescriptor.daemonInitialization();
                  DatabaseDescriptor.createAllDirectories();
  
 -                // We need to  persist this as soon as possible after startup checks.
 +                // We need to persist this as soon as possible after startup checks.
                  // This should be the first write to SystemKeyspace (CASSANDRA-11742)
                  SystemKeyspace.persistLocalMetadata();
 -                LegacySchemaMigrator.migrate();
 +                SystemKeyspaceMigrator40.migrate();
  
                  try
                  {
@@@ -393,37 -456,52 +408,53 @@@
          }
      }
  
-     @Override
      public Future<Void> shutdown()
      {
+         return shutdown(true);
+     }
+ 
++    @Override
+     public Future<Void> shutdown(boolean graceful)
+     {
 -        if (!graceful)
 -            MessagingService.instance().shutdown(false);
 -
          Future<?> future = async((ExecutorService executor) -> {
              Throwable error = null;
+ 
 -            if (config.has(GOSSIP) || config.has(NETWORK))
++            if (config.has(GOSSIP))
+             {
+                 StorageService.instance.shutdownServer();
+             }
+ 
              error = parallelRun(error, executor,
-                     Gossiper.instance::stop,
-                     CompactionManager.instance::forceShutdown,
-                     BatchlogManager.instance::shutdown,
-                     HintsService.instance::shutdownBlocking,
-                     () -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
-                     () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
-                     () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES),
-                     () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
-                     () -> Ref.shutdownReferenceReaper(1L, MINUTES),
-                     () -> Memtable.MEMORY_POOL.shutdown(1L, MINUTES),
-                     () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
-                     () -> SSTableReader.shutdownBlocking(1L, MINUTES),
-                     () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor))
+                                 () -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
+                                 CompactionManager.instance::forceShutdown,
+                                 () -> BatchlogManager.instance.shutdownAndWait(1L, MINUTES),
+                                 HintsService.instance::shutdownBlocking,
 -                                () -> StreamCoordinator.shutdownAndWait(1L, MINUTES),
 -                                () -> StreamSession.shutdownAndWait(1L, MINUTES),
++                                StreamingInboundHandler::shutdown,
++                                () -> StreamReceiveTask.shutdownAndWait(1L, MINUTES),
++                                () -> StreamTransferTask.shutdownAndWait(1L, MINUTES),
+                                 () -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
+                                 () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
+                                 () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
 -                                () -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
++                                () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES),
+                                 () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
+                                 () -> Ref.shutdownReferenceReaper(1L, MINUTES),
+                                 () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
 -                                () -> SSTableReader.shutdownBlocking(1L, MINUTES)
++                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
++                                () -> SSTableReader.shutdownBlocking(1L, MINUTES),
++                                () -> shutdownAndWait(Collections.singletonList(ActiveRepairService.repairCommandExecutor)),
++                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
              );
++
              error = parallelRun(error, executor,
 -                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
 -                                MessagingService.instance()::shutdown
 +                                CommitLog.instance::shutdownBlocking,
 +                                () -> MessagingService.instance().shutdown(1L, MINUTES, false, true)
              );
              error = parallelRun(error, executor,
++                                () -> GlobalEventExecutor.INSTANCE.awaitInactivity(1l, MINUTES),
                                  () -> StageManager.shutdownAndWait(1L, MINUTES),
-                                 () -> SharedExecutorPool.SHARED.shutdown(1L, MINUTES)
+                                 () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
              );
 -            error = parallelRun(error, executor,
 -                                CommitLog.instance::shutdownBlocking
 -            );
  
-             LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
-             loggerContext.stop();
              Throwables.maybeFail(error);
          }).apply(isolatedExecutor);
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index 5bf3734,0ef5a69..aa45d27
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@@ -48,8 -46,9 +48,9 @@@ public class InstanceClassLoader extend
                 name.startsWith("org.apache.cassandra.distributed.api.")
              || name.startsWith("sun.")
              || name.startsWith("oracle.")
+             || name.startsWith("com.intellij.")
              || name.startsWith("com.sun.")
 -            || name.startsWith("com.sun.")
 +            || name.startsWith("com.oracle.")
              || name.startsWith("java.")
              || name.startsWith("javax.")
              || name.startsWith("jdk.")
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index ad5d4d9,a215062..8c8a774
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -102,15 -101,13 +101,15 @@@ public class InstanceConfig implements 
                  .set("concurrent_compactors", 1)
                  .set("memtable_heap_space_in_mb", 10)
                  .set("commitlog_sync", "batch")
 -                .set("storage_port", 7010)
 +                .set("storage_port", 7012)
                  .set("endpoint_snitch", SimpleSnitch.class.getName())
                  .set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(),
 -                        Collections.singletonMap("seeds", "127.0.0.1")))
 +                        Collections.singletonMap("seeds", "127.0.0.1:7012")))
 +                // required settings for dtest functionality
 +                .set("diagnostic_events_enabled", true)
                  // legacy parameters
                  .forceSet("commitlog_sync_batch_window_in_ms", 1.0);
-                 //
+         this.featureFlags = EnumSet.noneOf(Feature.class);
      }
  
      private InstanceConfig(InstanceConfig copy)
diff --cc test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
index 780e65e,238e9e7..56fd05c
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
@@@ -26,15 -26,10 +26,15 @@@ import org.apache.cassandra.db.Keyspace
  import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.impl.IInvokableInstance;
  
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
  import static org.junit.Assert.assertEquals;
  
- import static org.apache.cassandra.distributed.impl.InstanceConfig.NETWORK;
 +import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
 +import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
 +
  public class DistributedReadWritePathTest extends DistributedTestBase
  {
 +
      @Test
      public void coordinatorReadTest() throws Throwable
      {
diff --cc test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
index 90aa7ac,0000000..d554262
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipSettlesTest.java
@@@ -1,45 -1,0 +1,45 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +
- import static org.apache.cassandra.distributed.impl.InstanceConfig.GOSSIP;
- import static org.apache.cassandra.distributed.impl.InstanceConfig.NETWORK;
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
 +public class GossipSettlesTest extends DistributedTestBase
 +{
 +
 +    @Test
 +    public void test()
 +    {
 +        try (Cluster cluster = Cluster.create(3, config -> config.with(GOSSIP).with(NETWORK)))
 +        {
 +        }
 +        catch (IOException e)
 +        {
 +            e.printStackTrace();
 +        }
 +    }
 +
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
index 24d7a98,0000000..143b5cd
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
@@@ -1,134 -1,0 +1,135 @@@
- ///*
- // * Licensed to the Apache Software Foundation (ASF) under one
- // * or more contributor license agreements.  See the NOTICE file
- // * distributed with this work for additional information
- // * regarding copyright ownership.  The ASF licenses this file
- // * to you under the Apache License, Version 2.0 (the
- // * "License"); you may not use this file except in compliance
- // * with the License.  You may obtain a copy of the License at
- // *
- // *     http://www.apache.org/licenses/LICENSE-2.0
- // *
- // * Unless required by applicable law or agreed to in writing, software
- // * distributed under the License is distributed on an "AS IS" BASIS,
- // * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // * See the License for the specific language governing permissions and
- // * limitations under the License.
- // */
- //
- //package org.apache.cassandra.distributed.test;
- //
- //import java.io.IOException;
- //import java.util.Arrays;
- //import java.util.Map;
- //import java.util.function.Consumer;
- //
- //import com.google.common.collect.ImmutableList;
- //import com.google.common.collect.ImmutableMap;
- //import org.junit.Test;
- //
- //import org.apache.cassandra.distributed.Cluster;
- //import org.apache.cassandra.distributed.impl.InstanceConfig;
- //import org.apache.cassandra.service.StorageService;
- //import org.apache.cassandra.utils.concurrent.SimpleCondition;
- //import org.apache.cassandra.utils.progress.ProgressEventType;
- //
- //import static java.util.concurrent.TimeUnit.MINUTES;
- //import static org.apache.cassandra.distributed.impl.ExecUtil.rethrow;
- //import static org.apache.cassandra.distributed.impl.InstanceConfig.GOSSIP;
- //import static org.apache.cassandra.distributed.impl.InstanceConfig.NETWORK;
- //
- //public class RepairTest extends DistributedTestBase
- //{
- //    private static final String insert = withKeyspace("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');");
- //    private static final String query = withKeyspace("SELECT k, c1, c2 FROM %s.test WHERE k = ?;");
- //    private static void insert(Cluster cluster, int start, int end, int ... nodes)
- //    {
- //        for (int i = start ; i < end ; ++i)
- //            for (int node : nodes)
- //                cluster.get(node).executeInternal(insert, Integer.toString(i));
- //    }
- //
- //    private static void verify(Cluster cluster, int start, int end, int ... nodes)
- //    {
- //        for (int i = start ; i < end ; ++i)
- //        {
- //            for (int node = 1 ; node <= cluster.size() ; ++node)
- //            {
- //                Object[][] rows = cluster.get(node).executeInternal(query, Integer.toString(i));
- //                if (Arrays.binarySearch(nodes, node) >= 0)
- //                    assertRows(rows, new Object[] { Integer.toString(i), "value1", "value2" });
- //                else
- //                    assertRows(rows);
- //            }
- //        }
- //    }
- //
- //    private static void flush(Cluster cluster, int ... nodes)
- //    {
- //        for (int node : nodes)
- //            cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(KEYSPACE)));
- //    }
- //
- //    private Cluster create(Consumer<InstanceConfig> configModifier) throws IOException
- //    {
- //        configModifier = configModifier.andThen(
- //            config -> config.set("hinted_handoff_enabled", false)
- //                            .set("commitlog_sync_batch_window_in_ms", 5)
- //                            .with(NETWORK)
- //                            .with(GOSSIP)
- //        );
- //
- //        Cluster cluster = init(Cluster.build(3).withConfig(configModifier).start());
- //        try
- //        {
- //            cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k));"));
- //
- //            insert(cluster,    0, 1000, 1, 2, 3);
- //            flush(cluster, 1);
- //            insert(cluster, 1000, 1001, 1, 2);
- //            insert(cluster, 1001, 2001, 1, 2, 3);
- //            flush(cluster, 1, 2, 3);
- //
- //            verify(cluster,    0, 1000, 1, 2, 3);
- //            verify(cluster, 1000, 1001, 1, 2);
- //            verify(cluster, 1001, 2001, 1, 2, 3);
- //            return cluster;
- //        }
- //        catch (Throwable t)
- //        {
- //            cluster.close();
- //            throw t;
- //        }
- //    }
- //
- //    private void repair(Cluster cluster, Map<String, String> options)
- //    {
- //        cluster.get(1).runOnInstance(rethrow(() -> {
- //            SimpleCondition await = new SimpleCondition();
- //            StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
- //                if (event.getType() == ProgressEventType.COMPLETE)
- //                    await.signalAll();
- //            })).right.get();
- //            await.await(1L, MINUTES);
- //        }));
- //    }
- //
- //    void simpleRepair(boolean orderPreservingPartitioner, boolean sequential) throws IOException
- //    {
- //        Cluster cluster = create(config -> {
- //            if (orderPreservingPartitioner)
- //                config.set("partitioner", "org.apache.cassandra.dht.ByteOrderedPartitioner");
- //        });
- //        repair(cluster, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel"));
- //        verify(cluster, 0, 2001, 1, 2, 3);
- //    }
- //
- //    @Test
- //    public void testSimpleSequentialRepair() throws IOException
- //    {
- //        simpleRepair(false, true);
- //    }
- //
- //
- //}
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.distributed.test;
++
++import java.io.IOException;
++import java.util.Arrays;
++import java.util.Map;
++import java.util.function.Consumer;
++
++import com.google.common.collect.ImmutableList;
++import com.google.common.collect.ImmutableMap;
++import org.junit.Ignore;
++import org.junit.Test;
++
++import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.impl.InstanceConfig;
++import org.apache.cassandra.service.StorageService;
++import org.apache.cassandra.utils.concurrent.SimpleCondition;
++import org.apache.cassandra.utils.progress.ProgressEventType;
++
++import static java.util.concurrent.TimeUnit.MINUTES;
++import static org.apache.cassandra.distributed.impl.ExecUtil.rethrow;
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
++@Ignore
++public class RepairTest extends DistributedTestBase
++{
++    private static final String insert = withKeyspace("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');");
++    private static final String query = withKeyspace("SELECT k, c1, c2 FROM %s.test WHERE k = ?;");
++    private static void insert(Cluster cluster, int start, int end, int ... nodes)
++    {
++        for (int i = start ; i < end ; ++i)
++            for (int node : nodes)
++                cluster.get(node).executeInternal(insert, Integer.toString(i));
++    }
++
++    private static void verify(Cluster cluster, int start, int end, int ... nodes)
++    {
++        for (int i = start ; i < end ; ++i)
++        {
++            for (int node = 1 ; node <= cluster.size() ; ++node)
++            {
++                Object[][] rows = cluster.get(node).executeInternal(query, Integer.toString(i));
++                if (Arrays.binarySearch(nodes, node) >= 0)
++                    assertRows(rows, new Object[] { Integer.toString(i), "value1", "value2" });
++                else
++                    assertRows(rows);
++            }
++        }
++    }
++
++    private static void flush(Cluster cluster, int ... nodes)
++    {
++        for (int node : nodes)
++            cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(KEYSPACE)));
++    }
++
++    private Cluster create(Consumer<InstanceConfig> configModifier) throws IOException
++    {
++        configModifier = configModifier.andThen(
++            config -> config.set("hinted_handoff_enabled", false)
++                            .set("commitlog_sync_batch_window_in_ms", 5)
++                            .with(NETWORK)
++                            .with(GOSSIP)
++        );
++
++        Cluster cluster = init(Cluster.build(3).withConfig(configModifier).start());
++        try
++        {
++            cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k));"));
++
++            insert(cluster,    0, 1000, 1, 2, 3);
++            flush(cluster, 1);
++            insert(cluster, 1000, 1001, 1, 2);
++            insert(cluster, 1001, 2001, 1, 2, 3);
++            flush(cluster, 1, 2, 3);
++
++            verify(cluster,    0, 1000, 1, 2, 3);
++            verify(cluster, 1000, 1001, 1, 2);
++            verify(cluster, 1001, 2001, 1, 2, 3);
++            return cluster;
++        }
++        catch (Throwable t)
++        {
++            cluster.close();
++            throw t;
++        }
++    }
++
++    private void repair(Cluster cluster, Map<String, String> options)
++    {
++        cluster.get(1).runOnInstance(rethrow(() -> {
++            SimpleCondition await = new SimpleCondition();
++            StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
++                if (event.getType() == ProgressEventType.COMPLETE)
++                    await.signalAll();
++            })).right.get();
++            await.await(1L, MINUTES);
++        }));
++    }
++
++    void simpleRepair(boolean orderPreservingPartitioner, boolean sequential) throws IOException
++    {
++        Cluster cluster = create(config -> {
++            if (orderPreservingPartitioner)
++                config.set("partitioner", "org.apache.cassandra.dht.ByteOrderedPartitioner");
++        });
++        repair(cluster, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel"));
++        verify(cluster, 0, 2001, 1, 2, 3);
++    }
++
++    @Test
++    public void testSimpleSequentialRepair() throws IOException
++    {
++        simpleRepair(false, true);
++    }
++
++
++}
diff --cc test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
index 39c89e6,0000000..22cd590
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
@@@ -1,75 -1,0 +1,75 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.util.Arrays;
 +import java.util.Comparator;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.service.StorageService;
 +
- import static org.apache.cassandra.distributed.impl.InstanceConfig.NETWORK;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
 +public class StreamingTest extends DistributedTestBase
 +{
 +
 +    private void testStreaming(int nodes, int replicationFactor, int rowCount, String compactionStrategy) throws Throwable
 +    {
 +        try (Cluster cluster = Cluster.create(nodes, config -> config.with(NETWORK)))
 +        {
 +            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
 +            cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'true'}", KEYSPACE, compactionStrategy));
 +
 +            for (int i = 0 ; i < rowCount ; ++i)
 +            {
 +                for (int n = 1 ; n < nodes ; ++n)
 +                    cluster.get(n).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i));
 +            }
 +
 +            cluster.get(nodes).executeInternal("TRUNCATE system.available_ranges;");
 +            {
 +                Object[][] results = cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
 +                Assert.assertEquals(0, results.length);
 +            }
 +
 +            cluster.get(nodes).runOnInstance(() -> StorageService.instance.rebuild(null, KEYSPACE, null, null));
 +            {
 +                Object[][] results = cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE));
 +                Assert.assertEquals(1000, results.length);
 +                Arrays.sort(results, Comparator.comparingInt(a -> Integer.parseInt((String) a[0])));
 +                for (int i = 0 ; i < results.length ; ++i)
 +                {
 +                    Assert.assertEquals(Integer.toString(i), results[i][0]);
 +                    Assert.assertEquals("value1", results[i][1]);
 +                    Assert.assertEquals("value2", results[i][2]);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Test
 +    public void test() throws Throwable
 +    {
 +        testStreaming(2, 2, 1000, "LeveledCompactionStrategy");
 +    }
 +
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org