You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2019/08/20 13:56:02 UTC

[cassandra] branch cassandra-3.0 updated (54aeb50 -> 51c0f6b)

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a change to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 54aeb50  ninja fix CHANGES.txt for #14952
     new 8dcaa12  Allow instance class loaders to be garbage collected for inJVM dtest
     new 51c0f6b  Merge branch 'cassandra-2.2' into cassandra-3.0

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   1 +
 build.xml                                          |  18 +-
 .../apache/cassandra/batchlog/BatchlogManager.java |   7 +-
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   2 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |  11 +-
 .../cassandra/concurrent/SharedExecutorPool.java   |   4 +-
 .../apache/cassandra/concurrent/StageManager.java  |   8 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  14 +-
 .../apache/cassandra/db/commitlog/CommitLog.java   |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   9 +
 .../org/apache/cassandra/hints/HintsCatalog.java   |   5 +-
 .../cassandra/index/SecondaryIndexManager.java     |  12 +-
 .../cassandra/io/sstable/IndexSummaryManager.java  |  11 ++
 .../cassandra/io/sstable/format/SSTableReader.java |   5 +-
 .../org/apache/cassandra/net/MessagingService.java |   2 +
 .../cassandra/net/OutboundTcpConnection.java       |   2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   3 +-
 .../service/PendingRangeCalculatorService.java     |  11 +-
 .../apache/cassandra/service/StorageService.java   |  32 +++-
 .../cassandra/streaming/StreamCoordinator.java     |  13 ++
 .../cassandra/utils/BackgroundActivityMonitor.java |  12 +-
 .../org/apache/cassandra/utils/ExecutorUtils.java  | 151 ++++++++++++++++
 .../utils/NanoTimeToCurrentTimeMillis.java         |  14 +-
 .../org/apache/cassandra/utils/concurrent/Ref.java |  14 +-
 .../apache/cassandra/utils/memory/BufferPool.java  |  10 +-
 .../cassandra/utils/memory/MemtablePool.java       |   7 +-
 test/conf/logback-dtest.xml                        |  18 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  30 ++-
 .../cassandra/distributed/UpgradeableCluster.java  |  30 +--
 .../cassandra/distributed/api/IInstance.java       |   3 +-
 .../cassandra/distributed/api/IInstanceConfig.java |   1 +
 .../cassandra/distributed/api/IMessageFilters.java |   6 +-
 .../distributed/impl/AbstractCluster.java          | 154 +++++++++-------
 .../impl/DelegatingInvokableInstance.java          |   5 +-
 .../cassandra/distributed/impl/Instance.java       | 159 ++++++++++++----
 .../distributed/impl/InstanceClassLoader.java      |   9 +-
 .../cassandra/distributed/impl/InstanceConfig.java |  28 ++-
 .../distributed/impl/IsolatedExecutor.java         |  47 ++++-
 .../cassandra/distributed/impl/MessageFilters.java |  31 +---
 .../distributed/test/DistributedTestBase.java      |  26 ++-
 .../cassandra/distributed/test/GossipTest.java     |  14 +-
 .../distributed/test/ResourceLeakTest.java         | 201 +++++++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |   3 +-
 43 files changed, 871 insertions(+), 273 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/utils/ExecutorUtils.java
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 51c0f6b2f08c8f276482a2ef198327f756dbc4de
Merge: 54aeb50 8dcaa12
Author: Jon Meredith <jm...@gmail.com>
AuthorDate: Thu Aug 15 14:14:48 2019 -0600

    Merge branch 'cassandra-2.2' into cassandra-3.0

 CHANGES.txt                                        |   1 +
 build.xml                                          |  18 +-
 .../apache/cassandra/batchlog/BatchlogManager.java |   7 +-
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   2 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |  11 +-
 .../cassandra/concurrent/SharedExecutorPool.java   |   4 +-
 .../apache/cassandra/concurrent/StageManager.java  |   8 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  14 +-
 .../apache/cassandra/db/commitlog/CommitLog.java   |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   9 +
 .../org/apache/cassandra/hints/HintsCatalog.java   |   5 +-
 .../cassandra/index/SecondaryIndexManager.java     |  12 +-
 .../cassandra/io/sstable/IndexSummaryManager.java  |  11 ++
 .../cassandra/io/sstable/format/SSTableReader.java |   5 +-
 .../org/apache/cassandra/net/MessagingService.java |   2 +
 .../cassandra/net/OutboundTcpConnection.java       |   2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   3 +-
 .../service/PendingRangeCalculatorService.java     |  11 +-
 .../apache/cassandra/service/StorageService.java   |  32 +++-
 .../cassandra/streaming/StreamCoordinator.java     |  13 ++
 .../cassandra/utils/BackgroundActivityMonitor.java |  12 +-
 .../org/apache/cassandra/utils/ExecutorUtils.java  | 151 ++++++++++++++++
 .../utils/NanoTimeToCurrentTimeMillis.java         |  14 +-
 .../org/apache/cassandra/utils/concurrent/Ref.java |  14 +-
 .../apache/cassandra/utils/memory/BufferPool.java  |  10 +-
 .../cassandra/utils/memory/MemtablePool.java       |   7 +-
 test/conf/logback-dtest.xml                        |  18 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  30 ++-
 .../cassandra/distributed/UpgradeableCluster.java  |  30 +--
 .../cassandra/distributed/api/IInstance.java       |   3 +-
 .../cassandra/distributed/api/IInstanceConfig.java |   1 +
 .../cassandra/distributed/api/IMessageFilters.java |   6 +-
 .../distributed/impl/AbstractCluster.java          | 154 +++++++++-------
 .../impl/DelegatingInvokableInstance.java          |   5 +-
 .../cassandra/distributed/impl/Instance.java       | 159 ++++++++++++----
 .../distributed/impl/InstanceClassLoader.java      |   9 +-
 .../cassandra/distributed/impl/InstanceConfig.java |  28 ++-
 .../distributed/impl/IsolatedExecutor.java         |  47 ++++-
 .../cassandra/distributed/impl/MessageFilters.java |  31 +---
 .../distributed/test/DistributedTestBase.java      |  26 ++-
 .../cassandra/distributed/test/GossipTest.java     |  14 +-
 .../distributed/test/ResourceLeakTest.java         | 201 +++++++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |   3 +-
 43 files changed, 871 insertions(+), 273 deletions(-)

diff --cc CHANGES.txt
index 41ddef6,caea0f4..e956796
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -32,43 -6,12 +32,44 @@@
   * Fix index summary redistribution cancellation (CASSANDRA-15045)
   * Refactor Circle CI configuration (CASSANDRA-14806)
   * Fixing invalid CQL in security documentation (CASSANDRA-15020)
 - * Make tools/bin/token-generator py2/3 compatible (CASSANDRA-15012)
   * Multi-version in-JVM dtests (CASSANDRA-14937)
+  * Allow instance class loaders to be garbage collected for inJVM dtest (CASSANDRA-15170)
  
  
 -2.2.14
 +3.0.18
 + * Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
 + * Add a script to make running the cqlsh tests in cassandra repo easier (CASSANDRA-14951)
 + * If SizeEstimatesRecorder misses a 'onDropTable' notification, the size_estimates table will never be cleared for that table. (CASSANDRA-14905)
 + * Counters fail to increment in 2.1/2.2 to 3.X mixed version clusters (CASSANDRA-14958)
 + * Streaming needs to synchronise access to LifecycleTransaction (CASSANDRA-14554)
 + * Fix cassandra-stress write hang with default options (CASSANDRA-14616)
 + * Differentiate between slices and RTs when decoding legacy bounds (CASSANDRA-14919)
 + * CommitLogReplayer.handleReplayError should print stack traces (CASSANDRA-14589)
 + * Netty epoll IOExceptions caused by unclean client disconnects being logged at INFO (CASSANDRA-14909)
 + * Unfiltered.isEmpty conflicts with Row extends AbstractCollection.isEmpty (CASSANDRA-14588)
 + * RangeTombstoneList doesn't properly clean up mergeable or superseded rts in some cases (CASSANDRA-14894)
 + * Fix handling of collection tombstones for dropped columns from legacy sstables (CASSANDRA-14912)
 + * Throw exception if Columns serialized subset encode more columns than possible (CASSANDRA-14591)
 + * Drop/add column name with different Kind can result in corruption (CASSANDRA-14843)
 + * Fix missing rows when reading 2.1 SSTables with static columns in 3.0 (CASSANDRA-14873)
 + * Move TWCS message 'No compaction necessary for bucket size' to Trace level (CASSANDRA-14884)
 + * Sstable min/max metadata can cause data loss (CASSANDRA-14861)
 + * Dropped columns can cause reverse sstable iteration to return prematurely (CASSANDRA-14838)
 + * Legacy sstables with  multi block range tombstones create invalid bound sequences (CASSANDRA-14823)
 + * Expand range tombstone validation checks to multiple interim request stages (CASSANDRA-14824)
 + * Reverse order reads can return incomplete results (CASSANDRA-14803)
 + * Avoid calling iter.next() in a loop when notifying indexers about range tombstones (CASSANDRA-14794)
 + * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672)
 + * DESC order reads can fail to return the last Unfiltered in the partition (CASSANDRA-14766)
 + * Fix corrupted collection deletions for dropped columns in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
 + * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
 + * Handle failures in parallelAllSSTableOperation (cleanup/upgradesstables/etc) (CASSANDRA-14657)
 + * Improve TokenMetaData cache populating performance avoid long locking (CASSANDRA-14660)
 + * Backport: Flush netty client messages immediately (not by default) (CASSANDRA-13651)
 + * Fix static column order for SELECT * wildcard queries (CASSANDRA-14638)
 + * sstableloader should use discovered broadcast address to connect intra-cluster (CASSANDRA-14522)
 + * Fix reading columns with non-UTF names from schema (CASSANDRA-14468)
 + Merged from 2.2:
   * CircleCI docker image should bake in more dependencies (CASSANDRA-14985)
   * Don't enable client transports when bootstrap is pending (CASSANDRA-14525)
   * MigrationManager attempts to pull schema from different major version nodes (CASSANDRA-14928)
diff --cc src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index ab2239c,0000000..71d60e7
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@@ -1,590 -1,0 +1,591 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.batchlog;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ArrayListMultimap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.ListMultimap;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Multimap;
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.apache.cassandra.db.RowUpdateBuilder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.WriteType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.marshal.UUIDType;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.WriteFailureException;
 +import org.apache.cassandra.exceptions.WriteTimeoutException;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.hints.Hint;
 +import org.apache.cassandra.hints.HintsService;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.WriteResponseHandler;
++import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.MBeanWrapper;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +import static com.google.common.collect.Iterables.transform;
 +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 +import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
 +
 +public class BatchlogManager implements BatchlogManagerMBean
 +{
 +    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
 +    private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
 +    static final int DEFAULT_PAGE_SIZE = 128;
 +
 +    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
 +    public static final BatchlogManager instance = new BatchlogManager();
 +    public static final long BATCHLOG_REPLAY_TIMEOUT = Long.getLong("cassandra.batchlog.replay_timeout_in_ms", DatabaseDescriptor.getWriteRpcTimeout() * 2);
 +
 +    private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
 +    private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
 +
 +    // Single-thread executor service for scheduling and serializing log replay.
 +    private final ScheduledExecutorService batchlogTasks;
 +
 +    public BatchlogManager()
 +    {
 +        ScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
 +        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 +        batchlogTasks = executor;
 +    }
 +
 +    public void start()
 +    {
 +        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 +
 +        batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
 +                                             StorageService.RING_DELAY,
 +                                             REPLAY_INTERVAL,
 +                                             TimeUnit.MILLISECONDS);
 +    }
 +
-     public void shutdown() throws InterruptedException
++    public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 +    {
-         batchlogTasks.shutdown();
-         batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
++        ExecutorUtils.shutdownAndWait(timeout, unit, batchlogTasks);
 +    }
 +
 +    public static void remove(UUID id)
 +    {
 +        new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
 +                                                         UUIDType.instance.decompose(id),
 +                                                         FBUtilities.timestampMicros(),
 +                                                         FBUtilities.nowInSeconds()))
 +            .apply();
 +    }
 +
 +    public static void store(Batch batch)
 +    {
 +        store(batch, true);
 +    }
 +
 +    public static void store(Batch batch, boolean durableWrites)
 +    {
 +        RowUpdateBuilder builder =
 +            new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
 +                .clustering()
 +                .add("version", MessagingService.current_version);
 +
 +        for (ByteBuffer mutation : batch.encodedMutations)
 +            builder.addListEntry("mutations", mutation);
 +
 +        for (Mutation mutation : batch.decodedMutations)
 +        {
 +            try (DataOutputBuffer buffer = new DataOutputBuffer())
 +            {
 +                Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
 +                builder.addListEntry("mutations", buffer.buffer());
 +            }
 +            catch (IOException e)
 +            {
 +                // shouldn't happen
 +                throw new AssertionError(e);
 +            }
 +        }
 +
 +        builder.build().apply(durableWrites);
 +    }
 +
 +    @VisibleForTesting
 +    public int countAllBatches()
 +    {
 +        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
 +        UntypedResultSet results = executeInternal(query);
 +        if (results == null || results.isEmpty())
 +            return 0;
 +
 +        return (int) results.one().getLong("count");
 +    }
 +
 +    public long getTotalBatchesReplayed()
 +    {
 +        return totalBatchesReplayed;
 +    }
 +
 +    public void forceBatchlogReplay() throws Exception
 +    {
 +        startBatchlogReplay().get();
 +    }
 +
 +    public Future<?> startBatchlogReplay()
 +    {
 +        // If a replay is already in progress this request will be executed after it completes.
 +        return batchlogTasks.submit(this::replayFailedBatches);
 +    }
 +
 +    void performInitialReplay() throws InterruptedException, ExecutionException
 +    {
 +        // Invokes initial replay. Used for testing only.
 +        batchlogTasks.submit(this::replayFailedBatches).get();
 +    }
 +
 +    private void replayFailedBatches()
 +    {
 +        logger.trace("Started replayFailedBatches");
 +
 +        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
 +        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
 +        int endpointsCount = StorageService.instance.getTokenMetadata().getAllEndpoints().size();
 +        if (endpointsCount <= 0)
 +        {
 +            logger.trace("Replay cancelled as there are no peers in the ring.");
 +            return;
 +        }
 +        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / endpointsCount;
 +        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
 +
 +        UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
 +        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
 +        int pageSize = calculatePageSize(store);
 +        // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
 +        // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
 +        // token(id) > token(lastReplayedUuid) as part of the query.
 +        String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.BATCHES);
 +        UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
 +        processBatchlogEntries(batches, pageSize, rateLimiter);
 +        lastReplayedUuid = limitUuid;
 +        logger.trace("Finished replayFailedBatches");
 +    }
 +
 +    // read less rows (batches) per page if they are very large
 +    static int calculatePageSize(ColumnFamilyStore store)
 +    {
 +        double averageRowSize = store.getMeanPartitionSize();
 +        if (averageRowSize <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
 +    }
 +
 +    private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
 +    {
 +        int positionInPage = 0;
 +        ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize);
 +
 +        Set<InetAddress> hintedNodes = new HashSet<>();
 +        Set<UUID> replayedBatches = new HashSet<>();
 +
 +        // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
 +        for (UntypedResultSet.Row row : batches)
 +        {
 +            UUID id = row.getUUID("id");
 +            int version = row.getInt("version");
 +            try
 +            {
 +                ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance));
 +                if (batch.replay(rateLimiter, hintedNodes) > 0)
 +                {
 +                    unfinishedBatches.add(batch);
 +                }
 +                else
 +                {
 +                    remove(id); // no write mutations were sent (either expired or all CFs involved truncated).
 +                    ++totalBatchesReplayed;
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                logger.warn("Skipped batch replay of {} due to {}", id, e);
 +                remove(id);
 +            }
 +
 +            if (++positionInPage == pageSize)
 +            {
 +                // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
 +                // finish processing the page before requesting the next row.
 +                finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
 +                positionInPage = 0;
 +            }
 +        }
 +
 +        finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
 +
 +        // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
 +        HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
 +
 +        // once all generated hints are fsynced, actually delete the batches
 +        replayedBatches.forEach(BatchlogManager::remove);
 +    }
 +
 +    private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
 +    {
 +        // schedule hints for timed out deliveries
 +        for (ReplayingBatch batch : batches)
 +        {
 +            batch.finish(hintedNodes);
 +            replayedBatches.add(batch.id);
 +        }
 +
 +        totalBatchesReplayed += batches.size();
 +        batches.clear();
 +    }
 +
 +    public static long getBatchlogTimeout()
 +    {
 +        return BATCHLOG_REPLAY_TIMEOUT; // enough time for the actual write + BM removal mutation
 +    }
 +
 +    private static class ReplayingBatch
 +    {
 +        private final UUID id;
 +        private final long writtenAt;
 +        private final List<Mutation> mutations;
 +        private final int replayedBytes;
 +
 +        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
 +
 +        ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException
 +        {
 +            this.id = id;
 +            this.writtenAt = UUIDGen.unixTimestamp(id);
 +            this.mutations = new ArrayList<>(serializedMutations.size());
 +            this.replayedBytes = addMutations(version, serializedMutations);
 +        }
 +
 +        public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
 +        {
 +            logger.trace("Replaying batch {}", id);
 +
 +            if (mutations.isEmpty())
 +                return 0;
 +
 +            int gcgs = gcgs(mutations);
 +            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
 +                return 0;
 +
 +            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
 +
 +            rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation.
 +
 +            return replayHandlers.size();
 +        }
 +
 +        public void finish(Set<InetAddress> hintedNodes)
 +        {
 +            for (int i = 0; i < replayHandlers.size(); i++)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
 +                try
 +                {
 +                    handler.get();
 +                }
 +                catch (WriteTimeoutException|WriteFailureException e)
 +                {
 +                    logger.trace("Failed replaying a batched mutation to a node, will write a hint");
 +                    logger.trace("Failure was : {}", e.getMessage());
 +                    // writing hints for the rest to hints, starting from i
 +                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
 +                    return;
 +                }
 +            }
 +        }
 +
 +        private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException
 +        {
 +            int ret = 0;
 +            for (ByteBuffer serializedMutation : serializedMutations)
 +            {
 +                ret += serializedMutation.remaining();
 +                try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true))
 +                {
 +                    addMutation(Mutation.serializer.deserialize(in, version));
 +                }
 +            }
 +
 +            return ret;
 +        }
 +
 +        // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
 +        // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
 +        // truncated.
 +        private void addMutation(Mutation mutation)
 +        {
 +            for (UUID cfId : mutation.getColumnFamilyIds())
 +                if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
 +                    mutation = mutation.without(cfId);
 +
 +            if (!mutation.isEmpty())
 +                mutations.add(mutation);
 +        }
 +
 +        private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
 +        {
 +            int gcgs = gcgs(mutations);
 +
 +            // expired
 +            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
 +                return;
 +
 +            for (int i = startFrom; i < replayHandlers.size(); i++)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
 +                Mutation undeliveredMutation = mutations.get(i);
 +
 +                if (handler != null)
 +                {
 +                    hintedNodes.addAll(handler.undelivered);
 +                    HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
 +                                                Hint.create(undeliveredMutation, writtenAt));
 +                }
 +            }
 +        }
 +
 +        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
 +                                                                              long writtenAt,
 +                                                                              Set<InetAddress> hintedNodes)
 +        {
 +            List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
 +            for (Mutation mutation : mutations)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
 +                if (handler != null)
 +                    handlers.add(handler);
 +            }
 +            return handlers;
 +        }
 +
 +        /**
 +         * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
 +         * when a replica is down or a write request times out.
 +         *
 +         * @return direct delivery handler to wait on or null, if no live nodes found
 +         */
 +        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
 +                                                                                     long writtenAt,
 +                                                                                     Set<InetAddress> hintedNodes)
 +        {
 +            Set<InetAddress> liveEndpoints = new HashSet<>();
 +            String ks = mutation.getKeyspaceName();
 +            Token tk = mutation.key().getToken();
 +
 +            for (InetAddress endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk))
 +            {
 +                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
 +                {
 +                    mutation.apply();
 +                }
 +                else if (FailureDetector.instance.isAlive(endpoint))
 +                {
 +                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
 +                }
 +                else
 +                {
 +                    hintedNodes.add(endpoint);
 +                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
 +                                                Hint.create(mutation, writtenAt));
 +                }
 +            }
 +
 +            if (liveEndpoints.isEmpty())
 +                return null;
 +
 +            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
 +            MessageOut<Mutation> message = mutation.createMessage();
 +            for (InetAddress endpoint : liveEndpoints)
 +                MessagingService.instance().sendRR(message, endpoint, handler, false);
 +            return handler;
 +        }
 +
 +        private static int gcgs(Collection<Mutation> mutations)
 +        {
 +            int gcgs = Integer.MAX_VALUE;
 +            for (Mutation mutation : mutations)
 +                gcgs = Math.min(gcgs, mutation.smallestGCGS());
 +            return gcgs;
 +        }
 +
 +        /**
 +         * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
 +         * which we did not receive a successful reply.
 +         */
 +        private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
 +        {
 +            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
 +
 +            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
 +            {
 +                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
 +                undelivered.addAll(writeEndpoints);
 +            }
 +
 +            @Override
 +            protected int totalBlockFor()
 +            {
 +                return this.naturalEndpoints.size();
 +            }
 +
 +            @Override
 +            public void response(MessageIn<T> m)
 +            {
 +                boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
 +                assert removed;
 +                super.response(m);
 +            }
 +        }
 +    }
 +
 +    public static class EndpointFilter
 +    {
 +        private final String localRack;
 +        private final Multimap<String, InetAddress> endpoints;
 +
 +        public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
 +        {
 +            this.localRack = localRack;
 +            this.endpoints = endpoints;
 +        }
 +
 +        /**
 +         * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
 +         */
 +        public Collection<InetAddress> filter()
 +        {
 +            // special case for single-node data centers
 +            if (endpoints.values().size() == 1)
 +                return endpoints.values();
 +
 +            // strip out dead endpoints and localhost
 +            ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
 +            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
 +                if (isValid(entry.getValue()))
 +                    validated.put(entry.getKey(), entry.getValue());
 +
 +            if (validated.size() <= 2)
 +                return validated.values();
 +
 +            if (validated.size() - validated.get(localRack).size() >= 2)
 +            {
 +                // we have enough endpoints in other racks
 +                validated.removeAll(localRack);
 +            }
 +
 +            if (validated.keySet().size() == 1)
 +            {
 +                /*
 +                 * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack)
 +                 * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack
 +                 * because of the preceding if block.
 +                 */
 +                List<InetAddress> otherRack = Lists.newArrayList(validated.values());
 +                shuffle(otherRack);
 +                return otherRack.subList(0, 2);
 +            }
 +
 +            // randomize which racks we pick from if more than 2 remaining
 +            Collection<String> racks;
 +            if (validated.keySet().size() == 2)
 +            {
 +                racks = validated.keySet();
 +            }
 +            else
 +            {
 +                racks = Lists.newArrayList(validated.keySet());
 +                shuffle((List<String>) racks);
 +            }
 +
 +            // grab a random member of up to two racks
 +            List<InetAddress> result = new ArrayList<>(2);
 +            for (String rack : Iterables.limit(racks, 2))
 +            {
 +                List<InetAddress> rackMembers = validated.get(rack);
 +                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
 +            }
 +
 +            return result;
 +        }
 +
 +        @VisibleForTesting
 +        protected boolean isValid(InetAddress input)
 +        {
 +            return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
 +        }
 +
 +        @VisibleForTesting
 +        protected int getRandomInt(int bound)
 +        {
 +            return ThreadLocalRandom.current().nextInt(bound);
 +        }
 +
 +        @VisibleForTesting
 +        protected void shuffle(List<?> list)
 +        {
 +            Collections.shuffle(list);
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 3997c1a,50cc5a3..e6a0df7
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@@ -115,10 -114,9 +115,10 @@@ public class SharedExecutorPoo
          return executor;
      }
  
-     public synchronized void shutdownAndWait() throws InterruptedException
 -    public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException
++    public synchronized void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException
      {
          shuttingDown = true;
 +        List<SEPExecutor> executors = new ArrayList<>(this.executors);
          for (SEPExecutor executor : executors)
              executor.shutdownNow();
  
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 355d710,01330a6..c5e81f0
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -55,32 -67,29 +55,34 @@@ import org.apache.cassandra.db.rows.Cel
  import org.apache.cassandra.dht.*;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.FSReadError;
 -import org.apache.cassandra.io.compress.CompressionParameters;
 +import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.index.internal.CassandraIndex;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.FSError;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.format.*;
 -import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
 -import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.format.big.BigFormat;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.metrics.ColumnFamilyMetrics;
 -import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.metrics.TableMetrics.Sampler;
 +import org.apache.cassandra.schema.*;
  import org.apache.cassandra.service.CacheService;
  import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.streaming.StreamLockfile;
 -import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.*;
 -import org.apache.cassandra.utils.concurrent.*;
  import org.apache.cassandra.utils.TopKSampler.SamplerResult;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
  import org.apache.cassandra.utils.memory.MemtableAllocator;
 -
 -import com.clearspring.analytics.stream.Counter;
 +import org.json.simple.JSONArray;
 +import org.json.simple.JSONObject;
  
+ import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+ import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
  import static org.apache.cassandra.utils.Throwables.maybeFail;
  
  public class ColumnFamilyStore implements ColumnFamilyStoreMBean
@@@ -220,14 -194,6 +222,8 @@@
      public volatile long sampleLatencyNanos;
      private final ScheduledFuture<?> latencyCalculator;
  
 +    private volatile boolean compactionSpaceCheck = true;
 +
-     public static void shutdownFlushExecutor() throws InterruptedException
-     {
-         flushExecutor.shutdown();
-         flushExecutor.awaitTermination(60, TimeUnit.SECONDS);
-     }
- 
      public static void shutdownPostFlushExecutor() throws InterruptedException
      {
          postFlushExecutor.shutdown();
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index c39f45a,bd4fe13..e9e0648
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -23,18 -23,14 +23,19 @@@ import java.util.*
  import java.util.Map.Entry;
  import java.util.concurrent.*;
  import java.util.concurrent.locks.ReentrantLock;
 +import java.util.stream.Collectors;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Throwables;
  import com.google.common.collect.ImmutableList;
  import com.google.common.collect.ImmutableMap;
 +import com.google.common.util.concurrent.ListenableFutureTask;
  import com.google.common.util.concurrent.Uninterruptibles;
  
 +import io.netty.util.concurrent.FastThreadLocal;
+ import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.MBeanWrapper;
 +import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.Pair;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
diff --cc src/java/org/apache/cassandra/hints/HintsCatalog.java
index 7d5c8e6,0000000..5a92889
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@@ -1,174 -1,0 +1,175 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.hints;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.Files;
++import java.nio.file.Path;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.stream.Stream;
 +import javax.annotation.Nullable;
 +
 +import com.google.common.collect.ImmutableMap;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.io.FSError;
 +import org.apache.cassandra.io.FSReadError;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.utils.NativeLibrary;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +import static java.util.stream.Collectors.groupingBy;
 +
 +/**
 + * A simple catalog for easy host id -> {@link HintsStore} lookup and manipulation.
 + */
 +final class HintsCatalog
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(HintsCatalog.class);
 +
 +    private final File hintsDirectory;
 +    private final Map<UUID, HintsStore> stores;
 +    private final ImmutableMap<String, Object> writerParams;
 +
 +    private HintsCatalog(File hintsDirectory, ImmutableMap<String, Object> writerParams, Map<UUID, List<HintsDescriptor>> descriptors)
 +    {
 +        this.hintsDirectory = hintsDirectory;
 +        this.writerParams = writerParams;
 +        this.stores = new ConcurrentHashMap<>();
 +
 +        for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet())
 +            stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, writerParams, entry.getValue()));
 +    }
 +
 +    /**
 +     * Loads hints stores from a given directory.
 +     */
 +    static HintsCatalog load(File hintsDirectory, ImmutableMap<String, Object> writerParams)
 +    {
-         try
++        try(Stream<Path> list = Files.list(hintsDirectory.toPath()))
 +        {
 +            Map<UUID, List<HintsDescriptor>> stores =
-                 Files.list(hintsDirectory.toPath())
++                     list
 +                     .filter(HintsDescriptor::isHintFileName)
 +                     .map(HintsDescriptor::readFromFileQuietly)
 +                     .filter(Optional::isPresent)
 +                     .map(Optional::get)
 +                     .collect(groupingBy(h -> h.hostId));
 +            return new HintsCatalog(hintsDirectory, writerParams, stores);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSReadError(e, hintsDirectory);
 +        }
 +    }
 +
 +    Stream<HintsStore> stores()
 +    {
 +        return stores.values().stream();
 +    }
 +
 +    void maybeLoadStores(Iterable<UUID> hostIds)
 +    {
 +        for (UUID hostId : hostIds)
 +            get(hostId);
 +    }
 +
 +    HintsStore get(UUID hostId)
 +    {
 +        // we intentionally don't just return stores.computeIfAbsent() because it's expensive compared to simple get(),
 +        // and in this case would also allocate for the capturing lambda; the method is on a really hot path
 +        HintsStore store = stores.get(hostId);
 +        return store == null
 +             ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, writerParams, Collections.emptyList()))
 +             : store;
 +    }
 +
 +    @Nullable
 +    HintsStore getNullable(UUID hostId)
 +    {
 +        return stores.get(hostId);
 +    }
 +
 +    /**
 +     * Delete all hints for all host ids.
 +     *
 +     * Will not delete the files that are currently being dispatched, or written to.
 +     */
 +    void deleteAllHints()
 +    {
 +        stores.keySet().forEach(this::deleteAllHints);
 +    }
 +
 +    /**
 +     * Delete all hints for the specified host id.
 +     *
 +     * Will not delete the files that are currently being dispatched, or written to.
 +     */
 +    void deleteAllHints(UUID hostId)
 +    {
 +        HintsStore store = stores.get(hostId);
 +        if (store != null)
 +            store.deleteAllHints();
 +    }
 +
 +    /**
 +     * @return true if at least one of the stores has a file pending dispatch
 +     */
 +    boolean hasFiles()
 +    {
 +        return stores().anyMatch(HintsStore::hasFiles);
 +    }
 +
 +    void exciseStore(UUID hostId)
 +    {
 +        deleteAllHints(hostId);
 +        stores.remove(hostId);
 +    }
 +
 +    void fsyncDirectory()
 +    {
 +        int fd = NativeLibrary.tryOpenDirectory(hintsDirectory.getAbsolutePath());
 +        if (fd != -1)
 +        {
 +            try
 +            {
 +                SyncUtil.trySync(fd);
 +                NativeLibrary.tryCloseFD(fd);
 +            }
 +            catch (FSError e) // trySync failed
 +            {
 +                logger.error("Unable to sync directory {}", hintsDirectory.getAbsolutePath(), e);
 +                FileUtils.handleFSErrorAndPropagate(e);
 +            }
 +        }
 +        else
 +        {
 +            logger.error("Unable to open directory {}", hintsDirectory.getAbsolutePath());
 +            FileUtils.handleFSErrorAndPropagate(new FSWriteError(new IOException(String.format("Unable to open hint directory %s", hintsDirectory.getAbsolutePath())), hintsDirectory.getAbsolutePath()));
 +        }
 +    }
 +
 +    ImmutableMap<String, Object> getWriterParams()
 +    {
 +        return writerParams;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index e93057b,0000000..d66a18b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -1,1156 -1,0 +1,1158 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index;
 +
 +import java.lang.reflect.Constructor;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Joiner;
 +import com.google.common.base.Strings;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Maps;
 +import com.google.common.collect.Sets;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.MoreExecutors;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.internal.CassandraIndex;
 +import org.apache.cassandra.index.transactions.*;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.Indexes;
 +import org.apache.cassandra.service.pager.SinglePartitionPager;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
++import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
++
 +/**
 + * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
 + * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
 + * and so on.
 + *
 + * The Index interface defines a number of methods which return Callable<?>. These are primarily the
 + * management tasks for an index implementation. Most of them are currently executed in a blocking
 + * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
 + * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially
 + * deadlocking on the FlushWriter or PostFlusher. Several of these Callable<?> returning methods on Index could
 + * then be defined with as void and called directly from SIM (rather than being run via the executor service).
 + * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
 + * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
 + *
 + * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
 + * involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
 + * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
 + * indexes is performed on the CompactionManager.
 + *
 + * This class also provides instances of processors which listen to updates to the base table and forward to
 + * registered Indexes the info required to keep those indexes up to date.
 + * There are two variants of these processors, each with a factory method provided by SIM:
 + *      IndexTransaction: deals with updates generated on the regular write path.
 + *      CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
 + * Further details on their usage and lifecycles can be found in the interface definitions below.
 + *
 + * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
 + * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
 + * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
 + * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
 + * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
 + * a target replica.
 + */
 +public class SecondaryIndexManager implements IndexRegistry
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 +
 +    // default page size (in rows) when rebuilding the index for a whole partition
 +    public static final int DEFAULT_PAGE_SIZE = 10000;
 +
 +    private Map<String, Index> indexes = Maps.newConcurrentMap();
 +
 +    /**
 +     * The indexes that are ready to server requests.
 +     */
 +    private Set<String> builtIndexes = Sets.newConcurrentHashSet();
 +
 +    // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
 +    private static final ExecutorService asyncExecutor =
 +        new JMXEnabledThreadPoolExecutor(1,
 +                                         StageManager.KEEPALIVE,
 +                                         TimeUnit.SECONDS,
 +                                         new LinkedBlockingQueue<>(),
 +                                         new NamedThreadFactory("SecondaryIndexManagement"),
 +                                         "internal");
 +
 +    // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
 +    private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
 +
 +    /**
 +     * The underlying column family containing the source data for these indexes
 +     */
 +    public final ColumnFamilyStore baseCfs;
 +
 +    public SecondaryIndexManager(ColumnFamilyStore baseCfs)
 +    {
 +        this.baseCfs = baseCfs;
 +    }
 +
 +
 +    /**
 +     * Drops and adds new indexes associated with the underlying CF
 +     */
 +    public void reload()
 +    {
 +        // figure out what needs to be added and dropped.
 +        Indexes tableIndexes = baseCfs.metadata.getIndexes();
 +        indexes.keySet()
 +               .stream()
 +               .filter(indexName -> !tableIndexes.has(indexName))
 +               .forEach(this::removeIndex);
 +
 +        // we call add for every index definition in the collection as
 +        // some may not have been created here yet, only added to schema
 +        for (IndexMetadata tableIndex : tableIndexes)
 +            addIndex(tableIndex);
 +    }
 +
 +    private Future<?> reloadIndex(IndexMetadata indexDef)
 +    {
 +        Index index = indexes.get(indexDef.name);
 +        Callable<?> reloadTask = index.getMetadataReloadTask(indexDef);
 +        return reloadTask == null
 +               ? Futures.immediateFuture(null)
 +               : blockingExecutor.submit(reloadTask);
 +    }
 +
 +    private Future<?> createIndex(IndexMetadata indexDef)
 +    {
 +        Index index = createInstance(indexDef);
 +        index.register(this);
 +
 +        // if the index didn't register itself, we can probably assume that no initialization needs to happen
 +        final Callable<?> initialBuildTask = indexes.containsKey(indexDef.name)
 +                                           ? index.getInitializationTask()
 +                                           : null;
 +        if (initialBuildTask == null)
 +        {
 +            // We need to make sure that the index is marked as built in the case where the initialBuildTask
 +            // does not need to be run (if the index didn't register itself or if the base table was empty).
 +            markIndexBuilt(indexDef.name);
 +            return Futures.immediateFuture(null);
 +        }
 +        return asyncExecutor.submit(index.getInitializationTask());
 +    }
 +
 +    /**
 +     * Adds and builds a index
 +     * @param indexDef the IndexMetadata describing the index
 +     */
 +    public synchronized Future<?> addIndex(IndexMetadata indexDef)
 +    {
 +        if (indexes.containsKey(indexDef.name))
 +            return reloadIndex(indexDef);
 +        else
 +            return createIndex(indexDef);
 +    }
 +
 +    /**
 +     * Checks if the specified index is queryable.
 +     *
 +     * @param index the index
 +     * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
 +     */
 +    public boolean isIndexQueryable(Index index)
 +    {
 +        return builtIndexes.contains(index.getIndexMetadata().name);
 +    }
 +
 +    public synchronized void removeIndex(String indexName)
 +    {
 +        Index index = unregisterIndex(indexName);
 +        if (null != index)
 +        {
 +            markIndexRemoved(indexName);
 +            executeBlocking(index.getInvalidateTask());
 +        }
 +    }
 +
 +
 +    public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
 +    {
 +        if (indexes.isEmpty())
 +            return Collections.emptySet();
 +
 +        Set<IndexMetadata> dependentIndexes = new HashSet<>();
 +        for (Index index : indexes.values())
 +            if (index.dependsOn(column))
 +                dependentIndexes.add(index.getIndexMetadata());
 +
 +        return dependentIndexes;
 +    }
 +
 +    /**
 +     * Called when dropping a Table
 +     */
 +    public void markAllIndexesRemoved()
 +    {
 +       getBuiltIndexNames().forEach(this::markIndexRemoved);
 +    }
 +
 +    /**
 +    * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
 +    * Caller must acquire and release references to the sstables used here.
 +    * Note also that only this method of (re)building indexes:
 +    *   a) takes a set of index *names* rather than Indexers
 +    *   b) marks exsiting indexes removed prior to rebuilding
 +    *
 +    * @param sstables the data to build from
 +    * @param indexNames the list of indexes to be rebuilt
 +    */
 +    public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
 +    {
 +        Set<Index> toRebuild = indexes.values().stream()
 +                                               .filter(index -> indexNames.contains(index.getIndexMetadata().name))
 +                                               .filter(Index::shouldBuildBlocking)
 +                                               .collect(Collectors.toSet());
 +        if (toRebuild.isEmpty())
 +        {
 +            logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
 +            return;
 +        }
 +
 +        toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexMetadata().name));
 +
 +        buildIndexesBlocking(sstables, toRebuild);
 +
 +        toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexMetadata().name));
 +    }
 +
 +    public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
 +    {
 +        buildIndexesBlocking(sstables, indexes.values()
 +                                              .stream()
 +                                              .filter(Index::shouldBuildBlocking)
 +                                              .collect(Collectors.toSet()));
 +    }
 +
 +    // For convenience, may be called directly from Index impls
 +    public void buildIndexBlocking(Index index)
 +    {
 +        if (index.shouldBuildBlocking())
 +        {
 +            try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
 +                 Refs<SSTableReader> sstables = viewFragment.refs)
 +            {
 +                buildIndexesBlocking(sstables, Collections.singleton(index));
 +                markIndexBuilt(index.getIndexMetadata().name);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Checks if the specified {@link ColumnFamilyStore} is a secondary index.
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code> to check.
 +     * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
 +     * <code>false</code> otherwise.
 +     */
 +    public static boolean isIndexColumnFamilyStore(ColumnFamilyStore cfs)
 +    {
 +        return isIndexColumnFamily(cfs.name);
 +    }
 +
 +    /**
 +     * Checks if the specified {@link ColumnFamilyStore} is the one secondary index.
 +     *
 +     * @param cfName the name of the <code>ColumnFamilyStore</code> to check.
 +     * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
 +     * <code>false</code> otherwise.
 +     */
 +    public static boolean isIndexColumnFamily(String cfName)
 +    {
 +        return cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    /**
 +     * Returns the parent of the specified {@link ColumnFamilyStore}.
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code>
 +     * @return the parent of the specified <code>ColumnFamilyStore</code>
 +     */
 +    public static ColumnFamilyStore getParentCfs(ColumnFamilyStore cfs)
 +    {
 +        String parentCfs = getParentCfsName(cfs.name);
 +        return cfs.keyspace.getColumnFamilyStore(parentCfs);
 +    }
 +
 +    /**
 +     * Returns the parent name of the specified {@link ColumnFamilyStore}.
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the parent name of the specified <code>ColumnFamilyStore</code>
 +     */
 +    public static String getParentCfsName(String cfName)
 +    {
 +        assert isIndexColumnFamily(cfName);
 +        return StringUtils.substringBefore(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    /**
 +     * Returns the index name
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code>
 +     * @return the index name
 +     */
 +    public static String getIndexName(ColumnFamilyStore cfs)
 +    {
 +        return getIndexName(cfs.name);
 +    }
 +
 +    /**
 +     * Returns the index name
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the index name
 +     */
 +    public static String getIndexName(String cfName)
 +    {
 +        assert isIndexColumnFamily(cfName);
 +        return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
 +    {
 +        if (indexes.isEmpty())
 +            return;
 +
 +        logger.info("Submitting index build of {} for data in {}",
 +                    indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
 +                    sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
 +
 +        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                  indexes,
 +                                                                  new ReducingKeyIterator(sstables));
 +        Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +        FBUtilities.waitOnFuture(future);
 +
 +        flushIndexesBlocking(indexes);
 +        logger.info("Index build of {} complete",
 +                    indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")));
 +    }
 +
 +    /**
 +     * Marks the specified index as build.
 +     * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
 +     * @param indexName the index name
 +     */
 +    public void markIndexBuilt(String indexName)
 +    {
 +        builtIndexes.add(indexName);
 +        if (DatabaseDescriptor.isDaemonInitialized())
 +            SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
 +    }
 +
 +    /**
 +     * Marks the specified index as removed.
 +     * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
 +     * @param indexName the index name
 +     */
 +    public void markIndexRemoved(String indexName)
 +    {
 +        SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
 +    }
 +
 +    public Index getIndexByName(String indexName)
 +    {
 +        return indexes.get(indexName);
 +    }
 +
 +    private Index createInstance(IndexMetadata indexDef)
 +    {
 +        Index newIndex;
 +        if (indexDef.isCustom())
 +        {
 +            assert indexDef.options != null;
 +            String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
 +            assert ! Strings.isNullOrEmpty(className);
 +            try
 +            {
 +                Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
 +                Constructor<? extends Index> ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
 +                newIndex = (Index)ctor.newInstance(baseCfs, indexDef);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +        else
 +        {
 +            newIndex = CassandraIndex.newIndex(baseCfs, indexDef);
 +        }
 +        return newIndex;
 +    }
 +
 +    /**
 +     * Truncate all indexes
 +     */
 +    public void truncateAllIndexesBlocking(final long truncatedAt)
 +    {
 +        executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
 +    }
 +
 +    /**
 +     * Remove all indexes
 +     */
 +    public void invalidateAllIndexesBlocking()
 +    {
 +        markAllIndexesRemoved();
 +        executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
 +    }
 +
 +    /**
 +     * Perform a blocking flush all indexes
 +     */
 +    public void flushAllIndexesBlocking()
 +    {
 +       flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
 +    }
 +
 +    /**
 +     * Perform a blocking flush of selected indexes
 +     */
 +    public void flushIndexesBlocking(Set<Index> indexes)
 +    {
 +        if (indexes.isEmpty())
 +            return;
 +
 +        List<Future<?>> wait = new ArrayList<>();
 +        List<Index> nonCfsIndexes = new ArrayList<>();
 +
 +        // for each CFS backed index, submit a flush task which we'll wait on for completion
 +        // for the non-CFS backed indexes, we'll flush those while we wait.
 +        synchronized (baseCfs.getTracker())
 +        {
 +            indexes.forEach(index ->
 +                index.getBackingTable()
 +                     .map(cfs -> wait.add(cfs.forceFlush()))
 +                     .orElseGet(() -> nonCfsIndexes.add(index)));
 +        }
 +
 +        executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
 +        FBUtilities.waitOnFutures(wait);
 +    }
 +
 +    /**
 +     * Performs a blocking flush of all custom indexes
 +     */
 +    public void flushAllNonCFSBackedIndexesBlocking()
 +    {
 +        executeAllBlocking(indexes.values()
 +                                  .stream()
 +                                  .filter(index -> !index.getBackingTable().isPresent()),
 +                           Index::getBlockingFlushTask);
 +    }
 +
 +    /**
 +     * @return all indexes which are marked as built and ready to use
 +     */
 +    public List<String> getBuiltIndexNames()
 +    {
 +        Set<String> allIndexNames = new HashSet<>();
 +        indexes.values().stream()
 +                .map(i -> i.getIndexMetadata().name)
 +                .forEach(allIndexNames::add);
 +        return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
 +    }
 +
 +    /**
 +     * @return all backing Tables used by registered indexes
 +     */
 +    public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores()
 +    {
 +        Set<ColumnFamilyStore> backingTables = new HashSet<>();
 +        indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add));
 +        return backingTables;
 +    }
 +
 +    /**
 +     * @return if there are ANY indexes registered for this table
 +     */
 +    public boolean hasIndexes()
 +    {
 +        return !indexes.isEmpty();
 +    }
 +
 +    /**
 +     * When building an index against existing data in sstables, add the given partition to the index
 +     */
 +    public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize)
 +    {
 +        if (logger.isTraceEnabled())
 +            logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey()));
 +
 +        if (!indexes.isEmpty())
 +        {
 +            SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata,
 +                                                                                          FBUtilities.nowInSeconds(),
 +                                                                                          key);
 +            int nowInSec = cmd.nowInSec();
 +            boolean readStatic = false;
 +
 +            SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION);
 +            while (!pager.isExhausted())
 +            {
 +                try (ReadOrderGroup readGroup = cmd.startOrderGroup();
 +                     OpOrder.Group writeGroup = Keyspace.writeOrder.start();
 +                     UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata, pageSize, readGroup))
 +                {
 +                    if (!page.hasNext())
 +                        break;
 +
 +                    try (UnfilteredRowIterator partition = page.next()) {
 +                        Set<Index.Indexer> indexers = indexes.stream()
 +                                                             .map(index -> index.indexerFor(key,
 +                                                                                            partition.columns(),
 +                                                                                            nowInSec,
 +                                                                                            writeGroup,
 +                                                                                            IndexTransaction.Type.UPDATE))
 +                                                             .filter(Objects::nonNull)
 +                                                             .collect(Collectors.toSet());
 +
 +                        // Short-circuit empty partitions if static row is processed or isn't read
 +                        if (!readStatic && partition.isEmpty() && partition.staticRow().isEmpty())
 +                            break;
 +
 +                        indexers.forEach(Index.Indexer::begin);
 +
 +                        if (!readStatic)
 +                        {
 +                            if (!partition.staticRow().isEmpty())
 +                                indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
 +                            indexers.forEach((Index.Indexer i) -> i.partitionDelete(partition.partitionLevelDeletion()));
 +                            readStatic = true;
 +                        }
 +
 +                        MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(partition.partitionLevelDeletion(), baseCfs.getComparator(), false);
 +
 +                        while (partition.hasNext())
 +                        {
 +                            Unfiltered unfilteredRow = partition.next();
 +
 +                            if (unfilteredRow.isRow())
 +                            {
 +                                Row row = (Row) unfilteredRow;
 +                                indexers.forEach(indexer -> indexer.insertRow(row));
 +                            }
 +                            else
 +                            {
 +                                assert unfilteredRow.isRangeTombstoneMarker();
 +                                RangeTombstoneMarker marker = (RangeTombstoneMarker) unfilteredRow;
 +                                deletionBuilder.add(marker);
 +                            }
 +                        }
 +
 +                        MutableDeletionInfo deletionInfo = deletionBuilder.build();
 +                        if (deletionInfo.hasRanges())
 +                        {
 +                            Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
 +                            while (iter.hasNext())
 +                            {
 +                                RangeTombstone rt = iter.next();
 +                                indexers.forEach(indexer -> indexer.rangeTombstone(rt));
 +                            }
 +                        }
 +
 +                        indexers.forEach(Index.Indexer::finish);
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Return the page size used when indexing an entire partition
 +     */
 +    public int calculateIndexingPageSize()
 +    {
 +        if (Boolean.getBoolean("cassandra.force_default_indexing_page_size"))
 +            return DEFAULT_PAGE_SIZE;
 +
 +        double targetPageSizeInBytes = 32 * 1024 * 1024;
 +        double meanPartitionSize = baseCfs.getMeanPartitionSize();
 +        if (meanPartitionSize <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        int meanCellsPerPartition = baseCfs.getMeanColumns();
 +        if (meanCellsPerPartition <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        int columnsPerRow = baseCfs.metadata.partitionColumns().regulars.size();
 +        if (columnsPerRow <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        int meanRowsPerPartition = meanCellsPerPartition / columnsPerRow;
 +        double meanRowSize = meanPartitionSize / meanRowsPerPartition;
 +
 +        int pageSize = (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, targetPageSizeInBytes / meanRowSize));
 +
 +        logger.trace("Calculated page size {} for indexing {}.{} ({}/{}/{}/{})",
 +                     pageSize,
 +                     baseCfs.metadata.ksName,
 +                     baseCfs.metadata.cfName,
 +                     meanPartitionSize,
 +                     meanCellsPerPartition,
 +                     meanRowsPerPartition,
 +                     meanRowSize);
 +
 +        return pageSize;
 +    }
 +
 +    /**
 +     * Delete all data from all indexes for this partition.
 +     * For when cleanup rips a partition out entirely.
 +     *
 +     * TODO : improve cleanup transaction to batch updates & perform them async
 +     */
 +    public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
 +    {
 +        // we need to acquire memtable lock because secondary index deletion may
 +        // cause a race (see CASSANDRA-3712). This is done internally by the
 +        // index transaction when it commits
 +        CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(),
 +                                                                    partition.columns(),
 +                                                                    nowInSec);
 +        indexTransaction.start();
 +        indexTransaction.onPartitionDeletion(new DeletionTime(FBUtilities.timestampMicros(), nowInSec));
 +        indexTransaction.commit();
 +
 +        while (partition.hasNext())
 +        {
 +            Unfiltered unfiltered = partition.next();
 +            if (unfiltered.kind() != Unfiltered.Kind.ROW)
 +                continue;
 +
 +            indexTransaction = newCleanupTransaction(partition.partitionKey(),
 +                                                     partition.columns(),
 +                                                     nowInSec);
 +            indexTransaction.start();
 +            indexTransaction.onRowDelete((Row)unfiltered);
 +            indexTransaction.commit();
 +        }
 +    }
 +
 +    /**
 +     * Called at query time to choose which (if any) of the registered index implementations to use for a given query.
 +     *
 +     * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces
 +     * the search space the most.
 +     *
 +     * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they
 +     * specify are automatically included. Following that, the registered indexes are filtered to include only those
 +     * which support the standard expressions in the RowFilter.
 +     *
 +     * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows
 +     * method.
 +     *
 +     * Implementation specific validation of the target expression, either custom or standard, by the selected
 +     * index should be performed in the searcherFor method to ensure that we pick the right index regardless of
 +     * the validity of the expression.
 +     *
 +     * This method is only called once during the lifecycle of a ReadCommand and the result is
 +     * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
 +     * ReadOrderGroup, or an estimate of the result size from an average index query.
 +     *
 +     * @param rowFilter RowFilter of the command to be executed
 +     * @return an Index instance, ready to use during execution of the command, or null if none
 +     * of the registered indexes can support the command.
 +     */
 +    public Index getBestIndexFor(RowFilter rowFilter)
 +    {
 +        if (indexes.isEmpty() || rowFilter.isEmpty())
 +            return null;
 +
 +        Set<Index> searchableIndexes = new HashSet<>();
 +        for (RowFilter.Expression expression : rowFilter)
 +        {
 +            if (expression.isCustom())
 +            {
 +                // Only a single custom expression is allowed per query and, if present,
 +                // we want to always favour the index specified in such an expression
 +                RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
 +                logger.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
 +                Tracing.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
 +                return indexes.get(customExpression.getTargetIndex().name);
 +            }
 +            else
 +            {
 +                indexes.values().stream()
 +                       .filter(index -> index.supportsExpression(expression.column(), expression.operator()))
 +                       .forEach(searchableIndexes::add);
 +            }
 +        }
 +
 +        if (searchableIndexes.isEmpty())
 +        {
 +            logger.trace("No applicable indexes found");
 +            Tracing.trace("No applicable indexes found");
 +            return null;
 +        }
 +
 +        Index selected = searchableIndexes.size() == 1
 +                         ? Iterables.getOnlyElement(searchableIndexes)
 +                         : searchableIndexes.stream()
 +                                            .min((a, b) -> Longs.compare(a.getEstimatedResultRows(),
 +                                                                         b.getEstimatedResultRows()))
 +                                            .orElseThrow(() -> new AssertionError("Could not select most selective index"));
 +
 +        // pay for an additional threadlocal get() rather than build the strings unnecessarily
 +        if (Tracing.isTracing())
 +        {
 +            Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
 +                          searchableIndexes.stream().map(i -> i.getIndexMetadata().name + ':' + i.getEstimatedResultRows())
 +                                           .collect(Collectors.joining(",")),
 +                          selected.getIndexMetadata().name);
 +        }
 +        return selected;
 +    }
 +
 +    /**
 +     * Called at write time to ensure that values present in the update
 +     * are valid according to the rules of all registered indexes which
 +     * will process it. The partition key as well as the clustering and
 +     * cell values for each row in the update may be checked by index
 +     * implementations
 +     * @param update PartitionUpdate containing the values to be validated by registered Index implementations
 +     * @throws InvalidRequestException
 +     */
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        for (Index index : indexes.values())
 +            index.validate(update);
 +    }
 +
 +    /**
 +     * IndexRegistry methods
 +     */
 +    public void registerIndex(Index index)
 +    {
 +        String name = index.getIndexMetadata().name;
 +        indexes.put(name, index);
 +        logger.trace("Registered index {}", name);
 +    }
 +
 +    public void unregisterIndex(Index index)
 +    {
 +        unregisterIndex(index.getIndexMetadata().name);
 +    }
 +
 +    private Index unregisterIndex(String name)
 +    {
 +        Index removed = indexes.remove(name);
 +        builtIndexes.remove(name);
 +        logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
 +                     name);
 +        return removed;
 +    }
 +
 +    public Index getIndex(IndexMetadata metadata)
 +    {
 +        return indexes.get(metadata.name);
 +    }
 +
 +    public Collection<Index> listIndexes()
 +    {
 +        return ImmutableSet.copyOf(indexes.values());
 +    }
 +
 +    /**
 +     * Handling of index updates.
 +     * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
 +     * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
 +     */
 +
 +    /**
 +     * Transaction for updates on the write path.
 +     */
 +    public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
 +    {
 +        if (!hasIndexes())
 +            return UpdateTransaction.NO_OP;
 +
 +        Index.Indexer[] indexers = indexes.values().stream()
 +                                          .map(i -> i.indexerFor(update.partitionKey(),
 +                                                                 update.columns(),
 +                                                                 nowInSec,
 +                                                                 opGroup,
 +                                                                 IndexTransaction.Type.UPDATE))
 +                                          .filter(Objects::nonNull)
 +                                          .toArray(Index.Indexer[]::new);
 +
 +        return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
 +    }
 +
 +    /**
 +     * Transaction for use when merging rows during compaction
 +     */
 +    public CompactionTransaction newCompactionTransaction(DecoratedKey key,
 +                                                          PartitionColumns partitionColumns,
 +                                                          int versions,
 +                                                          int nowInSec)
 +    {
 +        // the check for whether there are any registered indexes is already done in CompactionIterator
 +        return new IndexGCTransaction(key, partitionColumns, versions, nowInSec, listIndexes());
 +    }
 +
 +    /**
 +     * Transaction for use when removing partitions during cleanup
 +     */
 +    public CleanupTransaction newCleanupTransaction(DecoratedKey key,
 +                                                    PartitionColumns partitionColumns,
 +                                                    int nowInSec)
 +    {
 +        if (!hasIndexes())
 +            return CleanupTransaction.NO_OP;
 +
 +        return new CleanupGCTransaction(key, partitionColumns, nowInSec, listIndexes());
 +    }
 +
 +    /**
 +     * A single use transaction for processing a partition update on the regular write path
 +     */
 +    private static final class WriteTimeTransaction implements UpdateTransaction
 +    {
 +        private final Index.Indexer[] indexers;
 +
 +        private WriteTimeTransaction(Index.Indexer...indexers)
 +        {
 +            // don't allow null indexers, if we don't need any use a NullUpdater object
 +            for (Index.Indexer indexer : indexers) assert indexer != null;
 +            this.indexers = indexers;
 +        }
 +
 +        public void start()
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.begin();
 +        }
 +
 +        public void onPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.partitionDelete(deletionTime);
 +        }
 +
 +        public void onRangeTombstone(RangeTombstone tombstone)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.rangeTombstone(tombstone);
 +        }
 +
 +        public void onInserted(Row row)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.insertRow(row);
 +        }
 +
 +        public void onUpdated(Row existing, Row updated)
 +        {
 +            final Row.Builder toRemove = BTreeRow.sortedBuilder();
 +            toRemove.newRow(existing.clustering());
 +            toRemove.addPrimaryKeyLivenessInfo(existing.primaryKeyLivenessInfo());
 +            toRemove.addRowDeletion(existing.deletion());
 +            final Row.Builder toInsert = BTreeRow.sortedBuilder();
 +            toInsert.newRow(updated.clustering());
 +            toInsert.addPrimaryKeyLivenessInfo(updated.primaryKeyLivenessInfo());
 +            toInsert.addRowDeletion(updated.deletion());
 +            // diff listener collates the columns to be added & removed from the indexes
 +            RowDiffListener diffListener = new RowDiffListener()
 +            {
 +                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                {
 +                }
 +
 +                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                {
 +                }
 +
 +                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                {
 +                }
 +
 +                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                {
 +                    if (merged != null && !merged.equals(original))
 +                        toInsert.addCell(merged);
 +
 +                    if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
 +                        toRemove.addCell(original);
 +
 +                }
 +            };
 +            Rows.diff(diffListener, updated, existing);
 +            Row oldRow = toRemove.build();
 +            Row newRow = toInsert.build();
 +            for (Index.Indexer indexer : indexers)
 +                indexer.updateRow(oldRow, newRow);
 +        }
 +
 +        public void commit()
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.finish();
 +        }
 +
 +        private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
 +        {
 +            // If either the value or timestamp is different, then we
 +            // should delete from the index. If not, then we can infer that
 +            // at least one of the cells is an ExpiringColumn and that the
 +            // difference is in the expiry time. In this case, we don't want to
 +            // delete the old value from the index as the tombstone we insert
 +            // will just hide the inserted value.
 +            // Completely identical cells (including expiring columns with
 +            // identical ttl & localExpirationTime) will not get this far due
 +            // to the oldCell.equals(newCell) in StandardUpdater.update
 +            return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
 +        }
 +    }
 +
 +    /**
 +     * A single-use transaction for updating indexes for a single partition during compaction where the only
 +     * operation is to merge rows
 +     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
 +     * a single partition
 +     */
 +    private static final class IndexGCTransaction implements CompactionTransaction
 +    {
 +        private final DecoratedKey key;
 +        private final PartitionColumns columns;
 +        private final int versions;
 +        private final int nowInSec;
 +        private final Collection<Index> indexes;
 +
 +        private Row[] rows;
 +
 +        private IndexGCTransaction(DecoratedKey key,
 +                                   PartitionColumns columns,
 +                                   int versions,
 +                                   int nowInSec,
 +                                   Collection<Index> indexes)
 +        {
 +            this.key = key;
 +            this.columns = columns;
 +            this.versions = versions;
 +            this.indexes = indexes;
 +            this.nowInSec = nowInSec;
 +        }
 +
 +        public void start()
 +        {
 +            if (versions > 0)
 +                rows = new Row[versions];
 +        }
 +
 +        public void onRowMerge(Row merged, Row...versions)
 +        {
 +            // Diff listener constructs rows representing deltas between the merged and original versions
 +            // These delta rows are then passed to registered indexes for removal processing
 +            final Row.Builder[] builders = new Row.Builder[versions.length];
 +            RowDiffListener diffListener = new RowDiffListener()
 +            {
 +                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                {
 +                    if (original != null && (merged == null || !merged.isLive(nowInSec)))
 +                        getBuilder(i, clustering).addPrimaryKeyLivenessInfo(original);
 +                }
 +
 +                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                {
 +                }
 +
 +                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                {
 +                }
 +
 +                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                {
 +                    if (original != null && (merged == null || !merged.isLive(nowInSec)))
 +                        getBuilder(i, clustering).addCell(original);
 +                }
 +
 +                private Row.Builder getBuilder(int index, Clustering clustering)
 +                {
 +                    if (builders[index] == null)
 +                    {
 +                        builders[index] = BTreeRow.sortedBuilder();
 +                        builders[index].newRow(clustering);
 +                    }
 +                    return builders[index];
 +                }
 +            };
 +
 +            Rows.diff(diffListener, merged, versions);
 +
 +            for(int i = 0; i < builders.length; i++)
 +                if (builders[i] != null)
 +                    rows[i] = builders[i].build();
 +        }
 +
 +        public void commit()
 +        {
 +            if (rows == null)
 +                return;
 +
 +            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
 +            {
 +                for (Index index : indexes)
 +                {
 +                    Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, opGroup, Type.COMPACTION);
 +                    if (indexer == null)
 +                        continue;
 +
 +                    indexer.begin();
 +                    for (Row row : rows)
 +                        if (row != null)
 +                            indexer.removeRow(row);
 +                    indexer.finish();
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * A single-use transaction for updating indexes for a single partition during cleanup, where
 +     * partitions and rows are only removed
 +     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
 +     * a single partition
 +     */
 +    private static final class CleanupGCTransaction implements CleanupTransaction
 +    {
 +        private final DecoratedKey key;
 +        private final PartitionColumns columns;
 +        private final int nowInSec;
 +        private final Collection<Index> indexes;
 +
 +        private Row row;
 +        private DeletionTime partitionDelete;
 +
 +        private CleanupGCTransaction(DecoratedKey key,
 +                                     PartitionColumns columns,
 +                                     int nowInSec,
 +                                     Collection<Index> indexes)
 +        {
 +            this.key = key;
 +            this.columns = columns;
 +            this.indexes = indexes;
 +            this.nowInSec = nowInSec;
 +        }
 +
 +        public void start()
 +        {
 +        }
 +
 +        public void onPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            partitionDelete = deletionTime;
 +        }
 +
 +        public void onRowDelete(Row row)
 +        {
 +            this.row = row;
 +        }
 +
 +        public void commit()
 +        {
 +            if (row == null && partitionDelete == null)
 +                return;
 +
 +            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
 +            {
 +                for (Index index : indexes)
 +                {
 +                    Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, opGroup, Type.CLEANUP);
 +                    if (indexer == null)
 +                        continue;
 +
 +                    indexer.begin();
 +
 +                    if (partitionDelete != null)
 +                        indexer.partitionDelete(partitionDelete);
 +
 +                    if (row != null)
 +                        indexer.removeRow(row);
 +
 +                    indexer.finish();
 +                }
 +            }
 +        }
 +    }
 +
 +    private static void executeBlocking(Callable<?> task)
 +    {
 +        if (null != task)
 +            FBUtilities.waitOnFuture(blockingExecutor.submit(task));
 +    }
 +
 +    private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
 +    {
 +        List<Future<?>> waitFor = new ArrayList<>();
 +        indexers.forEach(indexer -> {
 +            Callable<?> task = function.apply(indexer);
 +            if (null != task)
 +                waitFor.add(blockingExecutor.submit(task));
 +        });
 +        FBUtilities.waitOnFutures(waitFor);
 +    }
 +
 +    @VisibleForTesting
-     public static void shutdownExecutors() throws InterruptedException
++    public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 +    {
 +        ExecutorService[] executors = new ExecutorService[]{ asyncExecutor, blockingExecutor };
-         for (ExecutorService executor : executors)
-             executor.shutdown();
-         for (ExecutorService executor : executors)
-             executor.awaitTermination(60, TimeUnit.SECONDS);
++        shutdown(executors);
++        awaitTermination(timeout, unit, executors);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 507b6fa,9317132..ae79217
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -41,9 -42,9 +42,10 @@@ import org.apache.cassandra.db.compacti
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
  import org.apache.cassandra.db.lifecycle.View;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MBeanWrapper;
  import org.apache.cassandra.utils.Pair;
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 82b26ea,e42b91b..b817c99
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -1155,22 -1061,15 +1155,24 @@@ public final class MessagingService imp
          }
      }
  
 -    private static void handleIOException(IOException e) throws IOException
 +    private static void handleIOExceptionOnClose(IOException e) throws IOException
      {
          // dirty hack for clean shutdown on OSX w/ Java >= 1.8.0_20
 -        // see https://bugs.openjdk.java.net/browse/JDK-8050499
 -        if ((!"Unknown error: 316".equals(e.getMessage()) || !"Mac OS X".equals(System.getProperty("os.name"))) &&
 -            !"Thread signal failed".equals(e.getMessage()) && // handle shutdown for in-JVM dtests
 -            !"Bad file descriptor".equals(e.getMessage()) &&
 -            !"No such file or directory".equals(e.getMessage()))
 -            throw e;
 +        // see https://bugs.openjdk.java.net/browse/JDK-8050499;
 +        // also CASSANDRA-12513
 +        if ("Mac OS X".equals(System.getProperty("os.name")))
 +        {
 +            switch (e.getMessage())
 +            {
 +                case "Unknown error: 316":
 +                case "No such file or directory":
++                case "Bad file descriptor":
++                case "Thread signal failed":
 +                    return;
 +            }
 +        }
 +
 +        throw e;
      }
  
      public Map<String, Integer> getLargeMessagePendingTasks()
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 4769b22,0a9a8da..d349b4b
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -92,10 -80,7 +93,11 @@@ import org.apache.cassandra.utils.progr
  import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
  import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;
  
 +import static java.util.Arrays.asList;
+ import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.stream.Collectors.toList;
 +import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
 +import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
  
  /**
   * This abstraction contains the token/identifier of this node
@@@ -651,12 -660,12 +653,17 @@@ public class StorageService extends Not
                  if (FBUtilities.isWindows())
                      WindowsTimer.endTimerPeriod(DatabaseDescriptor.getWindowsTimerInterval());
  
 +                // Cleanup logback
 +                DelayingShutdownHook logbackHook = new DelayingShutdownHook();
 +                logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
 +                logbackHook.run();
++
+                 // wait for miscellaneous tasks like sstable and commitlog segment deletion
+                 ScheduledExecutors.nonPeriodicTasks.shutdown();
+                 if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES))
+                     logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown");
              }
 -        }, "StorageServiceShutdownHook");
 +        }), "StorageServiceShutdownHook");
          Runtime.getRuntime().addShutdownHook(drainOnShutdown);
  
          replacing = DatabaseDescriptor.isReplacing();
@@@ -1403,9 -1368,9 +1410,9 @@@
          return bgMonitor.getSeverity(endpoint);
      }
  
-     public void shutdownBGMonitor()
 -    public void shutdownBGMonitorAndWait(long timeout, TimeUnit units) throws TimeoutException, InterruptedException
++    public void shutdownBGMonitorAndWait(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException
      {
-         bgMonitor.shutdown();
 -        bgMonitor.shutdownAndWait(timeout, units);
++        bgMonitor.shutdownAndWait(timeout, unit);
      }
  
      /**
@@@ -4211,149 -4020,96 +4218,158 @@@
       */
      public synchronized void drain() throws IOException, InterruptedException, ExecutionException
      {
 -        inShutdownHook = true;
 +        drain(false);
 +    }
  
 +    protected synchronized void drain(boolean isFinalShutdown) throws IOException, InterruptedException, ExecutionException
 +    {
          ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
 +        ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION);
          ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
 -        if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
 +
 +        if (mutationStage.isTerminated()
 +            && counterMutationStage.isTerminated()
 +            && viewMutationStage.isTerminated())
          {
 -            logger.warn("Cannot drain node (did it already happen?)");
 +            if (!isFinalShutdown)
 +                logger.warn("Cannot drain node (did it already happen?)");
              return;
          }
 -        setMode(Mode.DRAINING, "starting drain process", true);
 -        shutdownClientServers();
 -        ScheduledExecutors.optionalTasks.shutdown();
 -        Gossiper.instance.stop();
  
 -        setMode(Mode.DRAINING, "shutting down MessageService", false);
 -        MessagingService.instance().shutdown();
 +        assert !isShutdown;
 +        isShutdown = true;
  
 -        setMode(Mode.DRAINING, "clearing mutation stage", false);
 -        counterMutationStage.shutdown();
 -        mutationStage.shutdown();
 -        counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 -        mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +        try
 +        {
 +            setMode(Mode.DRAINING, "starting drain process", !isFinalShutdown);
  
-             BatchlogManager.instance.shutdown();
 -        StorageProxy.instance.verifyNoHintsInProgress();
++            try
++            {
++                /* not clear this is reasonable time, but propagated from prior embedded behaviour */
++                BatchlogManager.instance.shutdownAndWait(1L, MINUTES);
++            }
++            catch (TimeoutException t)
++            {
++                logger.error("Batchlog manager timed out shutting down", t);
++            }
+ 
 -        setMode(Mode.DRAINING, "flushing column families", false);
 -        // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
 -        totalCFs = 0;
 -        for (Keyspace keyspace : Keyspace.nonSystem())
 -            totalCFs += keyspace.getColumnFamilyStores().size();
 -        remainingCFs = totalCFs;
 -        // flush
 -        List<Future<?>> flushes = new ArrayList<>();
 -        for (Keyspace keyspace : Keyspace.nonSystem())
 -        {
 -            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 -                flushes.add(cfs.forceFlush());
 -        }
 -        // wait for the flushes.
 -        // TODO this is a godawful way to track progress, since they flush in parallel.  a long one could
 -        // thus make several short ones "instant" if we wait for them later.
 -        for (Future f : flushes)
 -        {
 -            FBUtilities.waitOnFuture(f);
 -            remainingCFs--;
 -        }
 +            HintsService.instance.pauseDispatch();
  
 -        try
 -        {
 -            /* not clear this is reasonable time, but propagated from prior embedded behaviour */
 -            BatchlogManager.shutdownAndWait(1L, MINUTES);
 -        }
 -        catch (TimeoutException t)
 -        {
 -            logger.error("Batchlog manager timed out shutting down", t);
 -        }
 +            if (daemon != null)
 +                shutdownClientServers();
 +            ScheduledExecutors.optionalTasks.shutdown();
 +            Gossiper.instance.stop();
 +
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "shutting down MessageService", false);
 +
 +            // In-progress writes originating here could generate hints to be written, so shut down MessagingService
 +            // before mutation stage, so we can get all the hints saved before shutting down
 +            MessagingService.instance().shutdown();
 +
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "clearing mutation stage", false);
 +            viewMutationStage.shutdown();
 +            counterMutationStage.shutdown();
 +            mutationStage.shutdown();
 +            viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +            counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +            mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +
 +            StorageProxy.instance.verifyNoHintsInProgress();
 +
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "flushing column families", false);
 +
 +            // disable autocompaction - we don't want to start any new compactions while we are draining
 +            for (Keyspace keyspace : Keyspace.all())
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    cfs.disableAutoCompaction();
 +
 +            // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
 +            totalCFs = 0;
 +            for (Keyspace keyspace : Keyspace.nonSystem())
 +                totalCFs += keyspace.getColumnFamilyStores().size();
 +            remainingCFs = totalCFs;
 +            // flush
 +            List<Future<?>> flushes = new ArrayList<>();
 +            for (Keyspace keyspace : Keyspace.nonSystem())
 +            {
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    flushes.add(cfs.forceFlush());
 +            }
 +            // wait for the flushes.
 +            // TODO this is a godawful way to track progress, since they flush in parallel.  a long one could
 +            // thus make several short ones "instant" if we wait for them later.
 +            for (Future f : flushes)
 +            {
 +                try
 +                {
 +                    FBUtilities.waitOnFuture(f);
 +                }
 +                catch (Throwable t)
 +                {
 +                    JVMStabilityInspector.inspectThrowable(t);
 +                    // don't let this stop us from shutting down the commitlog and other thread pools
 +                    logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t);
 +                }
  
 -        // Interrupt on going compaction and shutdown to prevent further compaction
 -        CompactionManager.instance.forceShutdown();
 +                remainingCFs--;
 +            }
  
 -        // Flush the system tables after all other tables are flushed, just in case flushing modifies any system state
 -        // like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny.
 -        // Flush system tables after stopping the batchlog manager and compactions since they both modify
 -        // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update
 -        // system tables, see SSTableReader.GlobalTidy)
 -        flushes.clear();
 -        for (Keyspace keyspace : Keyspace.system())
 -        {
 -            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 -                flushes.add(cfs.forceFlush());
 -        }
 -        FBUtilities.waitOnFutures(flushes);
 +            // Interrupt ongoing compactions and shutdown CM to prevent further compactions.
 +            CompactionManager.instance.forceShutdown();
 +            // Flush the system tables after all other tables are flushed, just in case flushing modifies any system state
 +            // like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny.
 +            // Flush system tables after stopping compactions since they modify
 +            // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update
 +            // system tables, see SSTableReader.GlobalTidy)
 +            flushes.clear();
 +            for (Keyspace keyspace : Keyspace.system())
 +            {
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    flushes.add(cfs.forceFlush());
 +            }
 +            FBUtilities.waitOnFutures(flushes);
 +
 +            HintsService.instance.shutdownBlocking();
  
 -        // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
 -        // there are no segments to replay, so we force the recycling of any remaining (should be at most one)
 -        CommitLog.instance.forceRecycleAllSegments();
 +            // Interrupt ongoing compactions and shutdown CM to prevent further compactions.
 +            CompactionManager.instance.forceShutdown();
  
 -        CommitLog.instance.shutdownBlocking();
 +            // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
 +            // there are no segments to replay, so we force the recycling of any remaining (should be at most one)
 +            CommitLog.instance.forceRecycleAllSegments();
  
 -        // wait for miscellaneous tasks like sstable and commitlog segment deletion
 -        ScheduledExecutors.nonPeriodicTasks.shutdown();
 -        if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES))
 -            logger.warn("Failed to wait for non periodic tasks to shutdown");
 +            CommitLog.instance.shutdownBlocking();
  
 -        ColumnFamilyStore.shutdownPostFlushExecutor();
 +            // wait for miscellaneous tasks like sstable and commitlog segment deletion
 +            ScheduledExecutors.nonPeriodicTasks.shutdown();
-             if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
++            if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES))
 +                logger.warn("Failed to wait for non periodic tasks to shutdown");
  
 -        setMode(Mode.DRAINED, true);
 +            ColumnFamilyStore.shutdownPostFlushExecutor();
 +            setMode(Mode.DRAINED, !isFinalShutdown);
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Caught an exception while draining ", t);
 +        }
 +    }
 +
 +    /**
 +     * Some services are shutdown during draining and we should not attempt to start them again.
 +     *
 +     * @param service - the name of the service we are trying to start.
 +     * @throws IllegalStateException - an exception that nodetool is able to convert into a message to display to the user
 +     */
 +    synchronized void checkServiceAllowedToStart(String service)
 +    {
 +        if (isDraining()) // when draining isShutdown is also true, so we check first to return a more accurate message
 +            throw new IllegalStateException(String.format("Unable to start %s because the node is draining.", service));
 +
 +        if (isShutdown()) // do not rely on operationMode in case it gets changed to decomissioned or other
 +            throw new IllegalStateException(String.format("Unable to start %s because the node was drained.", service));
      }
  
      // Never ever do this at home. Used by tests.
diff --cc src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 153a5b3,c009032..933c498
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@@ -35,22 -35,9 +35,26 @@@ import org.apache.cassandra.concurrent.
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.concurrent.InfiniteLoopExecutor.InterruptibleRunnable;
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.Memory;
 +import org.apache.cassandra.io.util.SafeMemory;
+ import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +import org.apache.cassandra.utils.Pair;
 +import org.cliffc.high_scale_lib.NonBlockingHashMap;
  
 +import static java.util.Collections.emptyList;
++import org.apache.cassandra.concurrent.InfiniteLoopExecutor.InterruptibleRunnable;
 +
++import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
  import static org.apache.cassandra.utils.Throwables.maybeFail;
  import static org.apache.cassandra.utils.Throwables.merge;
  
@@@ -374,343 -343,9 +378,337 @@@ public final class Ref<T> implements Re
          }
      }
  
 +    static final Deque<InProgressVisit> inProgressVisitPool = new ArrayDeque<InProgressVisit>();
 +
 +    @SuppressWarnings({ "rawtypes", "unchecked" })
 +    static InProgressVisit newInProgressVisit(Object o, List<Field> fields, Field field, String name)
 +    {
 +        Preconditions.checkNotNull(o);
 +        InProgressVisit ipv = inProgressVisitPool.pollLast();
 +        if (ipv == null)
 +            ipv = new InProgressVisit();
 +
 +        ipv.o = o;
 +        if (o instanceof Object[])
 +            ipv.collectionIterator = Arrays.asList((Object[])o).iterator();
 +        else if (o instanceof ConcurrentMap)
 +        {
 +            ipv.isMapIterator = true;
 +            ipv.collectionIterator = ((Map)o).entrySet().iterator();
 +        }
 +        else if (concurrentIterables.contains(o.getClass()) | o instanceof BlockingQueue)
 +            ipv.collectionIterator = ((Iterable)o).iterator();
 +
 +        ipv.fields = fields;
 +        ipv.field = field;
 +        ipv.name = name;
 +        return ipv;
 +    }
 +
 +    static void returnInProgressVisit(InProgressVisit ipv)
 +    {
 +        if (inProgressVisitPool.size() > 1024)
 +            return;
 +        ipv.name = null;
 +        ipv.fields = null;
 +        ipv.o = null;
 +        ipv.fieldIndex = 0;
 +        ipv.field = null;
 +        ipv.collectionIterator = null;
 +        ipv.mapEntryValue = null;
 +        ipv.isMapIterator = false;
 +        inProgressVisitPool.offer(ipv);
 +    }
 +
 +    /*
 +     * Stack state for walking an object graph.
 +     * Field index is the index of the current field being fetched.
 +     */
 +    @SuppressWarnings({ "rawtypes"})
 +    static class InProgressVisit
 +    {
 +        String name;
 +        List<Field> fields;
 +        Object o;
 +        int fieldIndex = 0;
 +        Field field;
 +
 +        //Need to know if Map.Entry should be returned or traversed as an object
 +        boolean isMapIterator;
 +        //If o is a ConcurrentMap, BlockingQueue, or Object[], this is populated with an iterator over the contents
 +        Iterator<Object> collectionIterator;
 +        //If o is a ConcurrentMap the entry set contains keys and values. The key is returned as the first child
 +        //And the associated value is stashed here and returned next
 +        Object mapEntryValue;
 +
 +        private Field nextField()
 +        {
 +            if (fields.isEmpty())
 +                return null;
 +
 +            if (fieldIndex >= fields.size())
 +                return null;
 +
 +            Field retval = fields.get(fieldIndex);
 +            fieldIndex++;
 +            return retval;
 +        }
 +
 +        Pair<Object, Field> nextChild() throws IllegalAccessException
 +        {
 +            //If the last child returned was a key from a map, the value from that entry is stashed
 +            //so it can be returned next
 +            if (mapEntryValue != null)
 +            {
 +                Pair<Object, Field> retval = Pair.create(mapEntryValue, field);
 +                mapEntryValue = null;
 +                return retval;
 +            }
 +
 +            //If o is a ConcurrentMap, BlockingQueue, or Object[], then an iterator will be stored to return the elements
 +            if (collectionIterator != null)
 +            {
 +                if (!collectionIterator.hasNext())
 +                    return null;
 +                Object nextItem = null;
 +                //Find the next non-null element to traverse since returning null will cause the visitor to stop
 +                while (collectionIterator.hasNext() && (nextItem = collectionIterator.next()) == null){}
 +                if (nextItem != null)
 +                {
 +                    if (isMapIterator & nextItem instanceof Map.Entry)
 +                    {
 +                        Map.Entry entry = (Map.Entry)nextItem;
 +                        mapEntryValue = entry.getValue();
 +                        return Pair.create(entry.getKey(), field);
 +                    }
 +                    return Pair.create(nextItem, field);
 +                }
 +                else
 +                {
 +                    return null;
 +                }
 +            }
 +
 +            //Basic traversal of an object by its member fields
 +            //Don't return null values as that indicates no more objects
 +            while (true)
 +            {
 +                Field nextField = nextField();
 +                if (nextField == null)
 +                    return null;
 +
 +                //A weak reference isn't strongly reachable
 +                //subclasses of WeakReference contain strong references in their fields, so those need to be traversed
 +                //The weak reference fields are in the common Reference class base so filter those out
 +                if (o instanceof WeakReference & nextField.getDeclaringClass() == Reference.class)
 +                    continue;
 +
 +                Object nextObject = nextField.get(o);
 +                if (nextObject != null)
 +                    return Pair.create(nextField.get(o), nextField);
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return field == null ? name : field.toString() + "-" + o.getClass().getName();
 +        }
 +    }
 +
 +    static class Visitor implements Runnable
 +    {
 +        final Deque<InProgressVisit> path = new ArrayDeque<>();
 +        final Set<Object> visited = Collections.newSetFromMap(new IdentityHashMap<>());
 +        @VisibleForTesting
 +        int lastVisitedCount;
 +        @VisibleForTesting
 +        long iterations = 0;
 +        GlobalState visiting;
 +        Set<GlobalState> haveLoops;
 +
 +        public void run()
 +        {
 +            try
 +            {
 +                for (GlobalState globalState : globallyExtant)
 +                {
 +                    if (globalState.tidy == null)
 +                        continue;
 +
 +                    // do a graph exploration of the GlobalState, since it should be shallow; if it references itself, we have a problem
 +                    path.clear();
 +                    visited.clear();
 +                    lastVisitedCount = 0;
 +                    iterations = 0;
 +                    visited.add(globalState);
 +                    visiting = globalState;
 +                    traverse(globalState.tidy);
 +                }
 +            }
 +            catch (Throwable t)
 +            {
 +                t.printStackTrace();
 +            }
 +            finally
 +            {
 +                lastVisitedCount = visited.size();
 +                path.clear();
 +                visited.clear();
 +            }
 +        }
 +
 +        /*
 +         * Searches for an indirect strong reference between rootObject and visiting.
 +         */
 +        void traverse(final RefCounted.Tidy rootObject)
 +        {
 +            path.offer(newInProgressVisit(rootObject, getFields(rootObject.getClass()), null, rootObject.name()));
 +
 +            InProgressVisit inProgress = null;
 +            while (inProgress != null || !path.isEmpty())
 +            {
 +                //If necessary fetch the next object to start tracing
 +                if (inProgress == null)
 +                    inProgress = path.pollLast();
 +
 +                try
 +                {
 +                    Pair<Object, Field> p = inProgress.nextChild();
 +                    Object child = null;
 +                    Field field = null;
 +
 +                    if (p != null)
 +                    {
 +                        iterations++;
 +                        child = p.left;
 +                        field = p.right;
 +                    }
 +
 +                    if (child != null && visited.add(child))
 +                    {
 +                        path.offer(inProgress);
 +                        inProgress = newInProgressVisit(child, getFields(child.getClass()), field, null);
 +                        continue;
 +                    }
 +                    else if (visiting == child)
 +                    {
 +                        if (haveLoops != null)
 +                            haveLoops.add(visiting);
 +                        NoSpamLogger.log(logger,
 +                                NoSpamLogger.Level.ERROR,
 +                                rootObject.getClass().getName(),
 +                                1,
 +                                TimeUnit.SECONDS,
 +                                "Strong self-ref loop detected {}",
 +                                path);
 +                    }
 +                    else if (child == null)
 +                    {
 +                        returnInProgressVisit(inProgress);
 +                        inProgress = null;
 +                        continue;
 +                    }
 +                }
 +                catch (IllegalAccessException e)
 +                {
 +                    NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, 5, TimeUnit.MINUTES, "Could not fully check for self-referential leaks", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    static final Map<Class<?>, List<Field>> fieldMap = new HashMap<>();
 +    static List<Field> getFields(Class<?> clazz)
 +    {
 +        if (clazz == null || clazz == PhantomReference.class || clazz == Class.class || java.lang.reflect.Member.class.isAssignableFrom(clazz))
 +            return emptyList();
 +        List<Field> fields = fieldMap.get(clazz);
 +        if (fields != null)
 +            return fields;
 +        fieldMap.put(clazz, fields = new ArrayList<>());
 +        for (Field field : clazz.getDeclaredFields())
 +        {
 +            if (field.getType().isPrimitive() || Modifier.isStatic(field.getModifiers()))
 +                continue;
 +            field.setAccessible(true);
 +            fields.add(field);
 +        }
 +        fields.addAll(getFields(clazz.getSuperclass()));
 +        return fields;
 +    }
 +
 +    public static class IdentityCollection
 +    {
 +        final Set<Tidy> candidates;
 +        public IdentityCollection(Set<Tidy> candidates)
 +        {
 +            this.candidates = candidates;
 +        }
 +
 +        public void add(Ref<?> ref)
 +        {
 +            candidates.remove(ref.state.globalState.tidy);
 +        }
 +        public void add(SelfRefCounted<?> ref)
 +        {
 +            add(ref.selfRef());
 +        }
 +        public void add(SharedCloseable ref)
 +        {
 +            if (ref instanceof SharedCloseableImpl)
 +                add((SharedCloseableImpl)ref);
 +        }
 +        public void add(SharedCloseableImpl ref)
 +        {
 +            add(ref.ref);
 +        }
 +        public void add(Memory memory)
 +        {
 +            if (memory instanceof SafeMemory)
 +                ((SafeMemory) memory).addTo(this);
 +        }
 +    }
 +
 +    private static class StrongLeakDetector implements Runnable
 +    {
 +        Set<Tidy> candidates = new HashSet<>();
 +
 +        public void run()
 +        {
 +            final Set<Tidy> candidates = Collections.newSetFromMap(new IdentityHashMap<>());
 +            for (GlobalState state : globallyExtant)
 +                candidates.add(state.tidy);
 +            removeExpected(candidates);
 +            this.candidates.retainAll(candidates);
 +            if (!this.candidates.isEmpty())
 +            {
 +                List<String> names = new ArrayList<>();
 +                for (Tidy tidy : this.candidates)
 +                    names.add(tidy.name());
 +                logger.warn("Strong reference leak candidates detected: {}", names);
 +            }
 +            this.candidates = candidates;
 +        }
 +
 +        private void removeExpected(Set<Tidy> candidates)
 +        {
 +            final Ref.IdentityCollection expected = new Ref.IdentityCollection(candidates);
 +            for (Keyspace ks : Keyspace.all())
 +            {
 +                for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
 +                {
 +                    View view = cfs.getTracker().getView();
 +                    for (SSTableReader reader : view.allKnownSSTables())
 +                        reader.addTo(expected);
 +                }
 +            }
 +        }
 +    }
 +
      @VisibleForTesting
-     public static void shutdownReferenceReaper() throws InterruptedException
+     public static void shutdownReferenceReaper(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         EXEC.shutdown();
-         EXEC.awaitTermination(60, TimeUnit.SECONDS);
-         if (STRONG_LEAK_DETECTOR != null)
-         {
-             STRONG_LEAK_DETECTOR.shutdownNow();
-             STRONG_LEAK_DETECTOR.awaitTermination(60, TimeUnit.SECONDS);
-         }
 -        ExecutorUtils.shutdownNowAndWait(timeout, unit, EXEC);
++        ExecutorUtils.shutdownNowAndWait(timeout, unit, EXEC, STRONG_LEAK_DETECTOR);
      }
  }
diff --cc src/java/org/apache/cassandra/utils/memory/BufferPool.java
index 339228c,0000000..d0cea0f
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@@ -1,847 -1,0 +1,851 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.utils.memory;
 +
 +import java.lang.ref.PhantomReference;
 +import java.lang.ref.ReferenceQueue;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 +
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.util.FileUtils;
++import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.metrics.BufferPoolMetrics;
 +import org.apache.cassandra.utils.concurrent.Ref;
 +
++import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
++
 +/**
 + * A pool of ByteBuffers that can be recycled.
 + */
 +public class BufferPool
 +{
 +    /** The size of a page aligned buffer, 64KiB */
 +    static final int CHUNK_SIZE = 64 << 10;
 +
 +    @VisibleForTesting
 +    public static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L;
 +
 +    @VisibleForTesting
 +    public static boolean ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = DatabaseDescriptor.getBufferPoolUseHeapIfExhausted();
 +
 +    @VisibleForTesting
 +    public static boolean DISABLED = Boolean.parseBoolean(System.getProperty("cassandra.test.disable_buffer_pool", "false"));
 +
 +    @VisibleForTesting
 +    public static boolean DEBUG = false;
 +
 +    private static final Logger logger = LoggerFactory.getLogger(BufferPool.class);
 +    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
 +    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
 +
 +    /** A global pool of chunks (page aligned buffers) */
 +    private static final GlobalPool globalPool = new GlobalPool();
 +
 +    /** A thread local pool of chunks, where chunks come from the global pool */
 +    private static final ThreadLocal<LocalPool> localPool = new ThreadLocal<LocalPool>() {
 +        @Override
 +        protected LocalPool initialValue()
 +        {
 +            return new LocalPool();
 +        }
 +    };
 +
 +    public static ByteBuffer get(int size)
 +    {
 +        if (DISABLED)
 +            return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
 +        else
 +            return takeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
 +    }
 +
 +    public static ByteBuffer get(int size, BufferType bufferType)
 +    {
 +        boolean direct = bufferType == BufferType.OFF_HEAP;
 +        if (DISABLED || !direct)
 +            return allocate(size, !direct);
 +        else
 +            return takeFromPool(size, !direct);
 +    }
 +
 +    /** Unlike the get methods, this will return null if the pool is exhausted */
 +    public static ByteBuffer tryGet(int size)
 +    {
 +        if (DISABLED)
 +            return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
 +        else
 +            return maybeTakeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
 +    }
 +
 +    private static ByteBuffer allocate(int size, boolean onHeap)
 +    {
 +        return onHeap
 +               ? ByteBuffer.allocate(size)
 +               : ByteBuffer.allocateDirect(size);
 +    }
 +
 +    private static ByteBuffer takeFromPool(int size, boolean allocateOnHeapWhenExhausted)
 +    {
 +        ByteBuffer ret = maybeTakeFromPool(size, allocateOnHeapWhenExhausted);
 +        if (ret != null)
 +            return ret;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Requested buffer size {} has been allocated directly due to lack of capacity", size);
 +
 +        return localPool.get().allocate(size, allocateOnHeapWhenExhausted);
 +    }
 +
 +    private static ByteBuffer maybeTakeFromPool(int size, boolean allocateOnHeapWhenExhausted)
 +    {
 +        if (size < 0)
 +            throw new IllegalArgumentException("Size must be positive (" + size + ")");
 +
 +        if (size == 0)
 +            return EMPTY_BUFFER;
 +
 +        if (size > CHUNK_SIZE)
 +        {
 +            if (logger.isTraceEnabled())
 +                logger.trace("Requested buffer size {} is bigger than {}, allocating directly", size, CHUNK_SIZE);
 +
 +            return localPool.get().allocate(size, allocateOnHeapWhenExhausted);
 +        }
 +
 +        return localPool.get().get(size);
 +    }
 +
 +    public static void put(ByteBuffer buffer)
 +    {
 +        if (!(DISABLED || buffer.hasArray()))
 +            localPool.get().put(buffer);
 +    }
 +
 +    /** This is not thread safe and should only be used for unit testing. */
 +    @VisibleForTesting
 +    static void reset()
 +    {
 +        localPool.get().reset();
 +        globalPool.reset();
 +    }
 +
 +    @VisibleForTesting
 +    static Chunk currentChunk()
 +    {
 +        return localPool.get().chunks[0];
 +    }
 +
 +    @VisibleForTesting
 +    static int numChunks()
 +    {
 +        int ret = 0;
 +        for (Chunk chunk : localPool.get().chunks)
 +        {
 +            if (chunk != null)
 +                ret++;
 +        }
 +        return ret;
 +    }
 +
 +    @VisibleForTesting
 +    static void assertAllRecycled()
 +    {
 +        globalPool.debug.check();
 +    }
 +
 +    public static long sizeInBytes()
 +    {
 +        return globalPool.sizeInBytes();
 +    }
 +
 +    static final class Debug
 +    {
 +        long recycleRound = 1;
 +        final Queue<Chunk> allChunks = new ConcurrentLinkedQueue<>();
 +        void register(Chunk chunk)
 +        {
 +            allChunks.add(chunk);
 +        }
 +        void recycle(Chunk chunk)
 +        {
 +            chunk.lastRecycled = recycleRound;
 +        }
 +        void check()
 +        {
 +            for (Chunk chunk : allChunks)
 +                assert chunk.lastRecycled == recycleRound;
 +            recycleRound++;
 +        }
 +    }
 +
 +    /**
 +     * A queue of page aligned buffers, the chunks, which have been sliced from bigger chunks,
 +     * the macro-chunks, also page aligned. Macro-chunks are allocated as long as we have not exceeded the
 +     * memory maximum threshold, MEMORY_USAGE_THRESHOLD and are never released.
 +     *
 +     * This class is shared by multiple thread local pools and must be thread-safe.
 +     */
 +    static final class GlobalPool
 +    {
 +        /** The size of a bigger chunk, 1-mbit, must be a multiple of CHUNK_SIZE */
 +        static final int MACRO_CHUNK_SIZE = 1 << 20;
 +
 +        static
 +        {
 +            assert Integer.bitCount(CHUNK_SIZE) == 1; // must be a power of 2
 +            assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2
 +            assert MACRO_CHUNK_SIZE % CHUNK_SIZE == 0; // must be a multiple
 +
 +            if (DISABLED)
 +                logger.info("Global buffer pool is disabled, allocating {}", ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
 +            else
 +                logger.info("Global buffer pool is enabled, when pool is exahusted (max is {} mb) it will allocate {}",
 +                            MEMORY_USAGE_THRESHOLD / (1024L * 1024L),
 +                            ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
 +        }
 +
 +        private final Debug debug = new Debug();
 +        private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>();
 +        // TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage
 +        private final Queue<Chunk> chunks = new ConcurrentLinkedQueue<>();
 +        private final AtomicLong memoryUsage = new AtomicLong();
 +
 +        /** Return a chunk, the caller will take owership of the parent chunk. */
 +        public Chunk get()
 +        {
 +            while (true)
 +            {
 +                Chunk chunk = chunks.poll();
 +                if (chunk != null)
 +                    return chunk;
 +
 +                if (!allocateMoreChunks())
 +                    // give it one last attempt, in case someone else allocated before us
 +                    return chunks.poll();
 +            }
 +        }
 +
 +        /**
 +         * This method might be called by multiple threads and that's fine if we add more
 +         * than one chunk at the same time as long as we don't exceed the MEMORY_USAGE_THRESHOLD.
 +         */
 +        private boolean allocateMoreChunks()
 +        {
 +            while (true)
 +            {
 +                long cur = memoryUsage.get();
 +                if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD)
 +                {
 +                    noSpamLogger.info("Maximum memory usage reached ({} bytes), cannot allocate chunk of {} bytes",
 +                                      MEMORY_USAGE_THRESHOLD, MACRO_CHUNK_SIZE);
 +                    return false;
 +                }
 +                if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE))
 +                    break;
 +            }
 +
 +            // allocate a large chunk
 +            Chunk chunk = new Chunk(allocateDirectAligned(MACRO_CHUNK_SIZE));
 +            chunk.acquire(null);
 +            macroChunks.add(chunk);
 +            for (int i = 0 ; i < MACRO_CHUNK_SIZE ; i += CHUNK_SIZE)
 +            {
 +                Chunk add = new Chunk(chunk.get(CHUNK_SIZE));
 +                chunks.add(add);
 +                if (DEBUG)
 +                    debug.register(add);
 +            }
 +
 +            return true;
 +        }
 +
 +        public void recycle(Chunk chunk)
 +        {
 +            chunks.add(chunk);
 +        }
 +
 +        public long sizeInBytes()
 +        {
 +            return memoryUsage.get();
 +        }
 +
 +        /** This is not thread safe and should only be used for unit testing. */
 +        @VisibleForTesting
 +        void reset()
 +        {
 +            while (!chunks.isEmpty())
 +                chunks.poll().reset();
 +
 +            while (!macroChunks.isEmpty())
 +                macroChunks.poll().reset();
 +
 +            memoryUsage.set(0);
 +        }
 +    }
 +
 +    /**
 +     * A thread local class that grabs chunks from the global pool for this thread allocations.
 +     * Only one thread can do the allocations but multiple threads can release the allocations.
 +     */
 +    static final class LocalPool
 +    {
 +        private final static BufferPoolMetrics metrics = new BufferPoolMetrics();
 +        // a microqueue of Chunks:
 +        //  * if any are null, they are at the end;
 +        //  * new Chunks are added to the last null index
 +        //  * if no null indexes available, the smallest is swapped with the last index, and this replaced
 +        //  * this results in a queue that will typically be visited in ascending order of available space, so that
 +        //    small allocations preferentially slice from the Chunks with the smallest space available to furnish them
 +        // WARNING: if we ever change the size of this, we must update removeFromLocalQueue, and addChunk
 +        private final Chunk[] chunks = new Chunk[3];
 +        private byte chunkCount = 0;
 +
 +        public LocalPool()
 +        {
 +            localPoolReferences.add(new LocalPoolRef(this, localPoolRefQueue));
 +        }
 +
 +        private Chunk addChunkFromGlobalPool()
 +        {
 +            Chunk chunk = globalPool.get();
 +            if (chunk == null)
 +                return null;
 +
 +            addChunk(chunk);
 +            return chunk;
 +        }
 +
 +        private void addChunk(Chunk chunk)
 +        {
 +            chunk.acquire(this);
 +
 +            if (chunkCount < 3)
 +            {
 +                chunks[chunkCount++] = chunk;
 +                return;
 +            }
 +
 +            int smallestChunkIdx = 0;
 +            if (chunks[1].free() < chunks[0].free())
 +                smallestChunkIdx = 1;
 +            if (chunks[2].free() < chunks[smallestChunkIdx].free())
 +                smallestChunkIdx = 2;
 +
 +            chunks[smallestChunkIdx].release();
 +            if (smallestChunkIdx != 2)
 +                chunks[smallestChunkIdx] = chunks[2];
 +            chunks[2] = chunk;
 +        }
 +
 +        public ByteBuffer get(int size)
 +        {
 +            for (Chunk chunk : chunks)
 +            { // first see if our own chunks can serve this buffer
 +                if (chunk == null)
 +                    break;
 +
 +                ByteBuffer buffer = chunk.get(size);
 +                if (buffer != null)
 +                    return buffer;
 +            }
 +
 +            // else ask the global pool
 +            Chunk chunk = addChunkFromGlobalPool();
 +            if (chunk != null)
 +                return chunk.get(size);
 +
 +           return null;
 +        }
 +
 +        private ByteBuffer allocate(int size, boolean onHeap)
 +        {
 +            metrics.misses.mark();
 +            return BufferPool.allocate(size, onHeap);
 +        }
 +
 +        public void put(ByteBuffer buffer)
 +        {
 +            Chunk chunk = Chunk.getParentChunk(buffer);
 +            if (chunk == null)
 +            {
 +                FileUtils.clean(buffer);
 +                return;
 +            }
 +
 +            LocalPool owner = chunk.owner;
 +            // ask the free method to take exclusive ownership of the act of recycling
 +            // if we are either: already not owned by anyone, or owned by ourselves
 +            long free = chunk.free(buffer, owner == null | owner == this);
 +            if (free == 0L)
 +            {
 +                // 0L => we own recycling responsibility, so must recycle;
 +                chunk.recycle();
 +                // if we are also the owner, we must remove the Chunk from our local queue
 +                if (owner == this)
 +                    removeFromLocalQueue(chunk);
 +            }
 +            else if (((free == -1L) && owner != this) && chunk.owner == null)
 +            {
 +                // although we try to take recycle ownership cheaply, it is not always possible to do so if the owner is racing to unset.
 +                // we must also check after completely freeing if the owner has since been unset, and try to recycle
 +                chunk.tryRecycle();
 +            }
 +        }
 +
 +        private void removeFromLocalQueue(Chunk chunk)
 +        {
 +            // since we only have three elements in the queue, it is clearer, easier and faster to just hard code the options
 +            if (chunks[0] == chunk)
 +            {   // remove first by shifting back second two
 +                chunks[0] = chunks[1];
 +                chunks[1] = chunks[2];
 +            }
 +            else if (chunks[1] == chunk)
 +            {   // remove second by shifting back last
 +                chunks[1] = chunks[2];
 +            }
 +            else assert chunks[2] == chunk;
 +            // whatever we do, the last element myst be null
 +            chunks[2] = null;
 +            chunkCount--;
 +        }
 +
 +        @VisibleForTesting
 +        void reset()
 +        {
 +            chunkCount = 0;
 +            for (int i = 0; i < chunks.length; i++)
 +            {
 +                if (chunks[i] != null)
 +                {
 +                    chunks[i].owner = null;
 +                    chunks[i].freeSlots = 0L;
 +                    chunks[i].recycle();
 +                    chunks[i] = null;
 +                }
 +            }
 +        }
 +    }
 +
 +    private static final class LocalPoolRef extends  PhantomReference<LocalPool>
 +    {
 +        private final Chunk[] chunks;
 +        public LocalPoolRef(LocalPool localPool, ReferenceQueue<? super LocalPool> q)
 +        {
 +            super(localPool, q);
 +            chunks = localPool.chunks;
 +        }
 +
 +        public void release()
 +        {
 +            for (int i = 0 ; i < chunks.length ; i++)
 +            {
 +                if (chunks[i] != null)
 +                {
 +                    chunks[i].release();
 +                    chunks[i] = null;
 +                }
 +            }
 +        }
 +    }
 +
 +    private static final ConcurrentLinkedQueue<LocalPoolRef> localPoolReferences = new ConcurrentLinkedQueue<>();
 +
 +    private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>();
 +    private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start();
 +
 +    private static void cleanupOneReference() throws InterruptedException
 +    {
 +        Object obj = localPoolRefQueue.remove(100);
 +        if (obj instanceof LocalPoolRef)
 +        {
 +            ((LocalPoolRef) obj).release();
 +            localPoolReferences.remove(obj);
 +        }
 +    }
 +
 +    private static ByteBuffer allocateDirectAligned(int capacity)
 +    {
 +        int align = MemoryUtil.pageSize();
 +        if (Integer.bitCount(align) != 1)
 +            throw new IllegalArgumentException("Alignment must be a power of 2");
 +
 +        ByteBuffer buffer = ByteBuffer.allocateDirect(capacity + align);
 +        long address = MemoryUtil.getAddress(buffer);
 +        long offset = address & (align -1); // (address % align)
 +
 +        if (offset == 0)
 +        { // already aligned
 +            buffer.limit(capacity);
 +        }
 +        else
 +        { // shift by offset
 +            int pos = (int)(align - offset);
 +            buffer.position(pos);
 +            buffer.limit(pos + capacity);
 +        }
 +
 +        return buffer.slice();
 +    }
 +
 +    /**
 +     * A memory chunk: it takes a buffer (the slab) and slices it
 +     * into smaller buffers when requested.
 +     *
 +     * It divides the slab into 64 units and keeps a long mask, freeSlots,
 +     * indicating if a unit is in use or not. Each bit in freeSlots corresponds
 +     * to a unit, if the bit is set then the unit is free (available for allocation)
 +     * whilst if it is not set then the unit is in use.
 +     *
 +     * When we receive a request of a given size we round up the size to the nearest
 +     * multiple of allocation units required. Then we search for n consecutive free units,
 +     * where n is the number of units required. We also align to page boundaries.
 +     *
 +     * When we reiceve a release request we work out the position by comparing the buffer
 +     * address to our base address and we simply release the units.
 +     */
 +    final static class Chunk
 +    {
 +        private final ByteBuffer slab;
 +        private final long baseAddress;
 +        private final int shift;
 +
 +        private volatile long freeSlots;
 +        private static final AtomicLongFieldUpdater<Chunk> freeSlotsUpdater = AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots");
 +
 +        // the pool that is _currently allocating_ from this Chunk
 +        // if this is set, it means the chunk may not be recycled because we may still allocate from it;
 +        // if it has been unset the local pool has finished with it, and it may be recycled
 +        private volatile LocalPool owner;
 +        private long lastRecycled;
 +        private final Chunk original;
 +
 +        Chunk(Chunk recycle)
 +        {
 +            assert recycle.freeSlots == 0L;
 +            this.slab = recycle.slab;
 +            this.baseAddress = recycle.baseAddress;
 +            this.shift = recycle.shift;
 +            this.freeSlots = -1L;
 +            this.original = recycle.original;
 +            if (DEBUG)
 +                globalPool.debug.recycle(original);
 +        }
 +
 +        Chunk(ByteBuffer slab)
 +        {
 +            assert !slab.hasArray();
 +            this.slab = slab;
 +            this.baseAddress = MemoryUtil.getAddress(slab);
 +
 +            // The number of bits by which we need to shift to obtain a unit
 +            // "31 &" is because numberOfTrailingZeros returns 32 when the capacity is zero
 +            this.shift = 31 & (Integer.numberOfTrailingZeros(slab.capacity() / 64));
 +            // -1 means all free whilst 0 means all in use
 +            this.freeSlots = slab.capacity() == 0 ? 0L : -1L;
 +            this.original = DEBUG ? this : null;
 +        }
 +
 +        /**
 +         * Acquire the chunk for future allocations: set the owner and prep
 +         * the free slots mask.
 +         */
 +        void acquire(LocalPool owner)
 +        {
 +            assert this.owner == null;
 +            this.owner = owner;
 +        }
 +
 +        /**
 +         * Set the owner to null and return the chunk to the global pool if the chunk is fully free.
 +         * This method must be called by the LocalPool when it is certain that
 +         * the local pool shall never try to allocate any more buffers from this chunk.
 +         */
 +        void release()
 +        {
 +            this.owner = null;
 +            tryRecycle();
 +        }
 +
 +        void tryRecycle()
 +        {
 +            assert owner == null;
 +            if (isFree() && freeSlotsUpdater.compareAndSet(this, -1L, 0L))
 +                recycle();
 +        }
 +
 +        void recycle()
 +        {
 +            assert freeSlots == 0L;
 +            globalPool.recycle(new Chunk(this));
 +        }
 +
 +        /**
 +         * We stash the chunk in the attachment of a buffer
 +         * that was returned by get(), this method simply
 +         * retrives the chunk that sliced a buffer, if any.
 +         */
 +        static Chunk getParentChunk(ByteBuffer buffer)
 +        {
 +            Object attachment = MemoryUtil.getAttachment(buffer);
 +
 +            if (attachment instanceof Chunk)
 +                return (Chunk) attachment;
 +
 +            if (attachment instanceof Ref)
 +                return ((Ref<Chunk>) attachment).get();
 +
 +            return null;
 +        }
 +
 +        ByteBuffer setAttachment(ByteBuffer buffer)
 +        {
 +            if (Ref.DEBUG_ENABLED)
 +                MemoryUtil.setAttachment(buffer, new Ref<>(this, null));
 +            else
 +                MemoryUtil.setAttachment(buffer, this);
 +
 +            return buffer;
 +        }
 +
 +        boolean releaseAttachment(ByteBuffer buffer)
 +        {
 +            Object attachment = MemoryUtil.getAttachment(buffer);
 +            if (attachment == null)
 +                return false;
 +
 +            if (attachment instanceof Ref)
 +                ((Ref<Chunk>) attachment).release();
 +
 +            return true;
 +        }
 +
 +        @VisibleForTesting
 +        void reset()
 +        {
 +            Chunk parent = getParentChunk(slab);
 +            if (parent != null)
 +                parent.free(slab, false);
 +            else
 +                FileUtils.clean(slab);
 +        }
 +
 +        @VisibleForTesting
 +        long setFreeSlots(long val)
 +        {
 +            long ret = freeSlots;
 +            freeSlots = val;
 +            return ret;
 +        }
 +
 +        int capacity()
 +        {
 +            return 64 << shift;
 +        }
 +
 +        final int unit()
 +        {
 +            return 1 << shift;
 +        }
 +
 +        final boolean isFree()
 +        {
 +            return freeSlots == -1L;
 +        }
 +
 +        /** The total free size */
 +        int free()
 +        {
 +            return Long.bitCount(freeSlots) * unit();
 +        }
 +
 +        /**
 +         * Return the next available slice of this size. If
 +         * we have exceeded the capacity we return null.
 +         */
 +        ByteBuffer get(int size)
 +        {
 +            // how many multiples of our units is the size?
 +            // we add (unit - 1), so that when we divide by unit (>>> shift), we effectively round up
 +            int slotCount = (size - 1 + unit()) >>> shift;
 +
 +            // if we require more than 64 slots, we cannot possibly accommodate the allocation
 +            if (slotCount > 64)
 +                return null;
 +
 +            // convert the slotCount into the bits needed in the bitmap, but at the bottom of the register
 +            long slotBits = -1L >>> (64 - slotCount);
 +
 +            // in order that we always allocate page aligned results, we require that any allocation is "somewhat" aligned
 +            // i.e. any single unit allocation can go anywhere; any 2 unit allocation must begin in one of the first 3 slots
 +            // of a page; a 3 unit must go in the first two slots; and any four unit allocation must be fully page-aligned
 +
 +            // to achieve this, we construct a searchMask that constrains the bits we find to those we permit starting
 +            // a match from. as we find bits, we remove them from the mask to continue our search.
 +            // this has an odd property when it comes to concurrent alloc/free, as we can safely skip backwards if
 +            // a new slot is freed up, but we always make forward progress (i.e. never check the same bits twice),
 +            // so running time is bounded
 +            long searchMask = 0x1111111111111111L;
 +            searchMask *= 15L >>> ((slotCount - 1) & 3);
 +            // i.e. switch (slotCount & 3)
 +            // case 1: searchMask = 0xFFFFFFFFFFFFFFFFL
 +            // case 2: searchMask = 0x7777777777777777L
 +            // case 3: searchMask = 0x3333333333333333L
 +            // case 0: searchMask = 0x1111111111111111L
 +
 +            // truncate the mask, removing bits that have too few slots proceeding them
 +            searchMask &= -1L >>> (slotCount - 1);
 +
 +            // this loop is very unroll friendly, and would achieve high ILP, but not clear if the compiler will exploit this.
 +            // right now, not worth manually exploiting, but worth noting for future
 +            while (true)
 +            {
 +                long cur = freeSlots;
 +                // find the index of the lowest set bit that also occurs in our mask (i.e. is permitted alignment, and not yet searched)
 +                // we take the index, rather than finding the lowest bit, since we must obtain it anyway, and shifting is more efficient
 +                // than multiplication
 +                int index = Long.numberOfTrailingZeros(cur & searchMask);
 +
 +                // if no bit was actually found, we cannot serve this request, so return null.
 +                // due to truncating the searchMask this immediately terminates any search when we run out of indexes
 +                // that could accommodate the allocation, i.e. is equivalent to checking (64 - index) < slotCount
 +                if (index == 64)
 +                    return null;
 +
 +                // remove this bit from our searchMask, so we don't return here next round
 +                searchMask ^= 1L << index;
 +                // if our bits occur starting at the index, remove ourselves from the bitmask and return
 +                long candidate = slotBits << index;
 +                if ((candidate & cur) == candidate)
 +                {
 +                    // here we are sure we will manage to CAS successfully without changing candidate because
 +                    // there is only one thread allocating at the moment, the concurrency is with the release
 +                    // operations only
 +                    while (true)
 +                    {
 +                        // clear the candidate bits (freeSlots &= ~candidate)
 +                        if (freeSlotsUpdater.compareAndSet(this, cur, cur & ~candidate))
 +                            break;
 +
 +                        cur = freeSlots;
 +                        // make sure no other thread has cleared the candidate bits
 +                        assert ((candidate & cur) == candidate);
 +                    }
 +                    return get(index << shift, size);
 +                }
 +            }
 +        }
 +
 +        private ByteBuffer get(int offset, int size)
 +        {
 +            slab.limit(offset + size);
 +            slab.position(offset);
 +
 +            return setAttachment(slab.slice());
 +        }
 +
 +        /**
 +         * Round the size to the next unit multiple.
 +         */
 +        int roundUp(int v)
 +        {
 +            return BufferPool.roundUp(v, unit());
 +        }
 +
 +        /**
 +         * Release a buffer. Return:
 +         *    0L if the buffer must be recycled after the call;
 +         *   -1L if it is free (and so we should tryRecycle if owner is now null)
 +         *    some other value otherwise
 +         **/
 +        long free(ByteBuffer buffer, boolean tryRelease)
 +        {
 +            if (!releaseAttachment(buffer))
 +                return 1L;
 +
 +            long address = MemoryUtil.getAddress(buffer);
 +            assert (address >= baseAddress) & (address <= baseAddress + capacity());
 +
 +            int position = (int)(address - baseAddress);
 +            int size = roundUp(buffer.capacity());
 +
 +            position >>= shift;
 +            int slotCount = size >> shift;
 +
 +            long slotBits = (1L << slotCount) - 1;
 +            long shiftedSlotBits = (slotBits << position);
 +
 +            if (slotCount == 64)
 +            {
 +                assert size == capacity();
 +                assert position == 0;
 +                shiftedSlotBits = -1L;
 +            }
 +
 +            long next;
 +            while (true)
 +            {
 +                long cur = freeSlots;
 +                next = cur | shiftedSlotBits;
 +                assert next == (cur ^ shiftedSlotBits); // ensure no double free
 +                if (tryRelease && (next == -1L))
 +                    next = 0L;
 +                if (freeSlotsUpdater.compareAndSet(this, cur, next))
 +                    return next;
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("[slab %s, slots bitmap %s, capacity %d, free %d]", slab, Long.toBinaryString(freeSlots), capacity(), free());
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public static int roundUpNormal(int size)
 +    {
 +        return roundUp(size, CHUNK_SIZE / 64);
 +    }
 +
 +    private static int roundUp(int size, int unit)
 +    {
 +        int mask = unit - 1;
 +        return (size + mask) & ~mask;
 +    }
 +
 +    @VisibleForTesting
-     public static void shutdownLocalCleaner() throws InterruptedException
++    public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 +    {
-         EXEC.shutdown();
-         EXEC.awaitTermination(60, TimeUnit.SECONDS);
++        shutdownNow(Arrays.asList(EXEC));
++        awaitTermination(timeout, unit, Arrays.asList(EXEC));
 +    }
 +}
diff --cc src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 83ccad6,9c4824a..8061566
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@@ -23,10 -24,8 +24,11 @@@ import java.util.concurrent.atomic.Atom
  
  import com.google.common.annotations.VisibleForTesting;
  
 -import org.apache.cassandra.utils.ExecutorUtils;
 +import com.codahale.metrics.Timer;
 +import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 +import org.apache.cassandra.metrics.DefaultNameFactory;
  import org.apache.cassandra.utils.concurrent.WaitQueue;
++import org.apache.cassandra.utils.ExecutorUtils;
  
  
  /**
@@@ -69,12 -64,12 +71,11 @@@ public abstract class MemtablePoo
      public abstract boolean needToCopyOnHeap();
  
      @VisibleForTesting
-     public void shutdown() throws InterruptedException
+     public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         cleaner.shutdown();
-         cleaner.awaitTermination(60, TimeUnit.SECONDS);
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
      }
  
 -
      public abstract MemtableAllocator newAllocator();
  
      /**
diff --cc test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index d0613b1,232ef0b..c77d725
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@@ -19,12 -19,10 +19,8 @@@
  package org.apache.cassandra.distributed;
  
  import java.io.File;
--import java.io.IOException;
--import java.nio.file.Files;
  import java.util.List;
- import java.util.Set;
  
- import org.apache.cassandra.distributed.api.Feature;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.impl.AbstractCluster;
  import org.apache.cassandra.distributed.impl.IUpgradeableInstance;
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 1b385fb,29426cb..aea21e2
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -34,10 -35,6 +34,7 @@@ import java.util.concurrent.Future
  import java.util.function.BiConsumer;
  import java.util.function.Function;
  
- import org.slf4j.LoggerFactory;
- 
- import ch.qos.logback.classic.LoggerContext;
 +import org.apache.cassandra.batchlog.BatchlogManager;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.SharedExecutorPool;
  import org.apache.cassandra.concurrent.StageManager;
@@@ -65,10 -64,8 +62,11 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.gms.ApplicationState;
  import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.hints.HintsService;
 +import org.apache.cassandra.index.SecondaryIndexManager;
+ import org.apache.cassandra.io.sstable.IndexSummaryManager;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.IMessageSink;
@@@ -81,12 -77,17 +79,18 @@@ import org.apache.cassandra.service.Cli
  import org.apache.cassandra.service.PendingRangeCalculatorService;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.StreamCoordinator;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
  import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.Ref;
 +import org.apache.cassandra.utils.memory.BufferPool;
  
+ import static java.util.concurrent.TimeUnit.MINUTES;
+ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+ 
  public class Instance extends IsolatedExecutor implements IInvokableInstance
  {
      public final IInstanceConfig config;
@@@ -232,14 -261,45 +264,43 @@@
          }
      }
  
-     public void receiveMessage(IMessage message)
+     public void receiveMessage(IMessage imessage)
      {
          sync(() -> {
-             try (DataInputBuffer in = new DataInputBuffer(message.bytes()))
+             // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
 -            try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(imessage.bytes())))
++            try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
              {
-                 MessageIn<?> messageIn = MessageIn.read(in, message.version(), message.id());
-                 Runnable deliver = new MessageDeliveryTask(messageIn, message.id(), System.currentTimeMillis(), false);
-                 deliver.run();
+                 int version = imessage.version();
+ 
+                 MessagingService.validateMagic(input.readInt());
+                 int id;
+                 if (version < MessagingService.VERSION_20)
+                     id = Integer.parseInt(input.readUTF());
+                 else
+                     id = input.readInt();
 -                assert imessage.id() == id;
+ 
+                 long timestamp = System.currentTimeMillis();
+                 boolean isCrossNodeTimestamp = false;
 -
+                 // make sure to readInt, even if cross_node_to is not enabled
+                 int partial = input.readInt();
+                 if (DatabaseDescriptor.hasCrossNodeTimeout())
+                 {
+                     long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+                     isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
+                     timestamp = crossNodeTimestamp;
+                 }
+ 
+                 MessageIn message = MessageIn.read(input, version, id);
+                 if (message == null)
+                 {
+                     // callback expired; nothing to do
+                     return;
+                 }
+                 if (version <= MessagingService.current_version)
+                 {
+                     MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
+                 }
+                 // else ignore message
              }
              catch (Throwable t)
              {
@@@ -265,12 -325,13 +326,14 @@@
              try
              {
                  mkdirs();
+ 
+                 DatabaseDescriptor.setDaemonInitialized();
                  DatabaseDescriptor.createAllDirectories();
  
 -                // We need to persist this as soon as possible after startup checks.
 +                // We need to  persist this as soon as possible after startup checks.
                  // This should be the first write to SystemKeyspace (CASSANDRA-11742)
                  SystemKeyspace.persistLocalMetadata();
 +                LegacySchemaMigrator.migrate();
  
                  try
                  {
@@@ -293,12 -354,24 +356,23 @@@
                  {
                      throw new RuntimeException(e);
                  }
 -
+                 if (config.has(NETWORK))
+                 {
+                     registerFilter(cluster);
+                     MessagingService.instance().listen();
+                 }
+                 else
+                 {
+                     // Even though we don't use MessagingService, access the static SocketFactory
+                     // instance here so that we start the static event loop state
+ //                    -- not sure what that means?  SocketFactory.instance.getClass();
+                     registerMockMessaging(cluster);
+                 }
  
-                 // TODO: support each separately
-                 if (with.contains(Feature.GOSSIP) || with.contains(Feature.NETWORK))
+                 // TODO: this is more than just gossip
+                 if (config.has(GOSSIP))
                  {
-                     StorageService.instance.prepareToJoin();
-                     StorageService.instance.joinTokenRing(1000);
+                     StorageService.instance.initServer();
                  }
                  else
                  {
@@@ -397,41 -465,42 +470,49 @@@
  
      public Future<Void> shutdown(boolean graceful)
      {
 +        if (!graceful)
 +            MessagingService.instance().shutdown(false);
 +
          Future<?> future = async((ExecutorService executor) -> {
              Throwable error = null;
+ 
+             if (config.has(GOSSIP) || config.has(NETWORK))
+             {
+                 StorageService.instance.shutdownServer();
+ 
+                 error = parallelRun(error, executor,
 -                    () -> NanoTimeToCurrentTimeMillis.shutdown(MINUTES.toMillis(1L))
++                                    () -> NanoTimeToCurrentTimeMillis.shutdown(MINUTES.toMillis(1L))
+                 );
+             }
+ 
              error = parallelRun(error, executor,
-                     Gossiper.instance::stop,
-                     CompactionManager.instance::forceShutdown,
-                     BatchlogManager.instance::shutdown,
-                     HintsService.instance::shutdownBlocking,
-                     SecondaryIndexManager::shutdownExecutors,
-                     ColumnFamilyStore::shutdownFlushExecutor,
-                     ColumnFamilyStore::shutdownPostFlushExecutor,
-                     ColumnFamilyStore::shutdownReclaimExecutor,
-                     PendingRangeCalculatorService.instance::shutdownExecutor,
-                     BufferPool::shutdownLocalCleaner,
-                     StorageService.instance::shutdownBGMonitor,
-                     Ref::shutdownReferenceReaper,
-                     Memtable.MEMORY_POOL::shutdown,
-                     ScheduledExecutors::shutdownAndWait,
-                     SSTableReader::shutdownBlocking
+                                 () -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
+                                 CompactionManager.instance::forceShutdown,
 -                                () -> BatchlogManager.shutdownAndWait(1L, MINUTES),
 -                                () -> HintedHandOffManager.instance.shutdownAndWait(1L, MINUTES),
++                                () -> BatchlogManager.instance.shutdownAndWait(1L, MINUTES),
++                                HintsService.instance::shutdownBlocking,
+                                 () -> StreamCoordinator.shutdownAndWait(1L, MINUTES),
++                                () -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
+                                 () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
+                                 () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
+                                 () -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
++                                () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
+                                 () -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
+                                 () -> Ref.shutdownReferenceReaper(1L, MINUTES),
+                                 () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
 -                                () -> SSTableReader.shutdownBlocking(1L, MINUTES),
 -                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
++                                () -> SSTableReader.shutdownBlocking(1L, MINUTES)
              );
              error = parallelRun(error, executor,
 -                                CommitLog.instance::shutdownBlocking,
++                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
                                  MessagingService.instance()::shutdown
              );
              error = parallelRun(error, executor,
-                                 StageManager::shutdownAndWait,
-                                 SharedExecutorPool.SHARED::shutdownAndWait
+                                 () -> StageManager.shutdownAndWait(1L, MINUTES),
+                                 () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
              );
 +            error = parallelRun(error, executor,
 +                                CommitLog.instance::shutdownBlocking
 +            );
  
-             LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
-             loggerContext.stop();
              Throwables.maybeFail(error);
          }).apply(isolatedExecutor);
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index 57530e0,363a1df..0ef5a69
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@@ -46,8 -48,9 +46,9 @@@ public class InstanceClassLoader extend
                 name.startsWith("org.apache.cassandra.distributed.api.")
              || name.startsWith("sun.")
              || name.startsWith("oracle.")
+             || name.startsWith("com.intellij.")
              || name.startsWith("com.sun.")
 -            || name.startsWith("com.oracle.")
 +            || name.startsWith("com.sun.")
              || name.startsWith("java.")
              || name.startsWith("javax.")
              || name.startsWith("jdk.")
diff --cc test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 3945ec5,757c17f..7b361bc
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@@ -41,6 -42,26 +42,23 @@@ public class DistributedTestBas
  
      public static String KEYSPACE = "distributed_test_keyspace";
  
+     public static void nativeLibraryWorkaround()
+     {
 -        // Disable the C library for in-JVM dtests otherwise it holds a gcroot against the InstanceClassLoader
 -        System.setProperty("cassandra.disable_clibrary", "true");
 -
+         // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
+         // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
+         // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
+         System.setProperty("cassandra.disable_tcactive_openssl", "true");
+         System.setProperty("io.netty.transport.noNative", "true");
+     }
+ 
+     public static void processReaperWorkaround()
+     {
+         // Make sure the 'process reaper' thread is initially created under the main classloader,
+         // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
+         // which prevents it from being garbage collected.
+         IsolatedExecutor.ThrowingRunnable.toRunnable(() -> new ProcessBuilder().command("true").start().waitFor()).run();
+     }
+ 
      @BeforeClass
      public static void setup()
      {
diff --cc test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index 11e9985,0000000..c2e9e4f
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@@ -1,113 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.net.InetAddress;
 +import java.util.Collection;
- import java.util.Collections;
- import java.util.EnumSet;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.locks.LockSupport;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.Iterables;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
- import org.apache.cassandra.distributed.api.Feature;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
- import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
++
 +public class GossipTest extends DistributedTestBase
 +{
 +
 +    @Test
 +    public void nodeDownDuringMove() throws Throwable
 +    {
 +        int liveCount = 1;
++        System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 30s default
 +        System.setProperty("cassandra.consistent.rangemovement", "false");
-         try (Cluster cluster = Cluster.create(2 + liveCount, EnumSet.of(Feature.GOSSIP)))
++        System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true");
++        try (Cluster cluster = Cluster.build(2 + liveCount)
++                                      .withConfig(config -> config.with(NETWORK).with(GOSSIP))
++                                      .createWithoutStarting())
 +        {
 +            int fail = liveCount + 1;
 +            int late = fail + 1;
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +                cluster.get(i).startup();
 +            cluster.get(fail).startup();
 +            Collection<String> expectTokens = cluster.get(fail).callsOnInstance(() ->
 +                StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress())
 +                                       .stream().map(Object::toString).collect(Collectors.toList())
 +            ).call();
 +
 +            InetAddress failAddress = cluster.get(fail).broadcastAddressAndPort().address;
 +            // wait for NORMAL state
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +            {
 +                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
 +                    EndpointState ep;
 +                    while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                           || ep.getApplicationState(ApplicationState.STATUS) == null
 +                           || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("NORMAL"))
 +                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +                }).accept(failAddress);
 +            }
 +
 +            // set ourselves to MOVING, and wait for it to propagate
 +            cluster.get(fail).runOnInstance(() -> {
 +
 +                Token token = Iterables.getFirst(StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()), null);
 +                Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.moving(token));
 +            });
 +
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +            {
 +                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
 +                    EndpointState ep;
 +                    while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                           || (ep.getApplicationState(ApplicationState.STATUS) == null
 +                               || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING")))
 +                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +                }).accept(failAddress);
 +            }
 +
 +            cluster.get(fail).shutdown(false).get();
 +            cluster.get(late).startup();
 +            cluster.get(late).acceptsOnInstance((InetAddress endpoint) -> {
 +                EndpointState ep;
 +                while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                       || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING"))
 +                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +            }).accept(failAddress);
 +
 +            Collection<String> tokens = cluster.get(late).appliesOnInstance((InetAddress endpoint) ->
 +                StorageService.instance.getTokenMetadata().getTokens(failAddress)
 +                                       .stream().map(Object::toString).collect(Collectors.toList())
 +            ).apply(failAddress);
 +
 +            Assert.assertEquals(expectTokens, tokens);
 +        }
 +    }
 +    
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org