You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2019/08/20 13:56:03 UTC

[cassandra] 01/01: Merge branch 'cassandra-2.2' into cassandra-3.0

This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 51c0f6b2f08c8f276482a2ef198327f756dbc4de
Merge: 54aeb50 8dcaa12
Author: Jon Meredith <jm...@gmail.com>
AuthorDate: Thu Aug 15 14:14:48 2019 -0600

    Merge branch 'cassandra-2.2' into cassandra-3.0

 CHANGES.txt                                        |   1 +
 build.xml                                          |  18 +-
 .../apache/cassandra/batchlog/BatchlogManager.java |   7 +-
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   2 +-
 .../cassandra/concurrent/ScheduledExecutors.java   |  11 +-
 .../cassandra/concurrent/SharedExecutorPool.java   |   4 +-
 .../apache/cassandra/concurrent/StageManager.java  |   8 +-
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  14 +-
 .../apache/cassandra/db/commitlog/CommitLog.java   |   1 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   9 +
 .../org/apache/cassandra/hints/HintsCatalog.java   |   5 +-
 .../cassandra/index/SecondaryIndexManager.java     |  12 +-
 .../cassandra/io/sstable/IndexSummaryManager.java  |  11 ++
 .../cassandra/io/sstable/format/SSTableReader.java |   5 +-
 .../org/apache/cassandra/net/MessagingService.java |   2 +
 .../cassandra/net/OutboundTcpConnection.java       |   2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   3 +-
 .../service/PendingRangeCalculatorService.java     |  11 +-
 .../apache/cassandra/service/StorageService.java   |  32 +++-
 .../cassandra/streaming/StreamCoordinator.java     |  13 ++
 .../cassandra/utils/BackgroundActivityMonitor.java |  12 +-
 .../org/apache/cassandra/utils/ExecutorUtils.java  | 151 ++++++++++++++++
 .../utils/NanoTimeToCurrentTimeMillis.java         |  14 +-
 .../org/apache/cassandra/utils/concurrent/Ref.java |  14 +-
 .../apache/cassandra/utils/memory/BufferPool.java  |  10 +-
 .../cassandra/utils/memory/MemtablePool.java       |   7 +-
 test/conf/logback-dtest.xml                        |  18 +-
 .../org/apache/cassandra/distributed/Cluster.java  |  30 ++-
 .../cassandra/distributed/UpgradeableCluster.java  |  30 +--
 .../cassandra/distributed/api/IInstance.java       |   3 +-
 .../cassandra/distributed/api/IInstanceConfig.java |   1 +
 .../cassandra/distributed/api/IMessageFilters.java |   6 +-
 .../distributed/impl/AbstractCluster.java          | 154 +++++++++-------
 .../impl/DelegatingInvokableInstance.java          |   5 +-
 .../cassandra/distributed/impl/Instance.java       | 159 ++++++++++++----
 .../distributed/impl/InstanceClassLoader.java      |   9 +-
 .../cassandra/distributed/impl/InstanceConfig.java |  28 ++-
 .../distributed/impl/IsolatedExecutor.java         |  47 ++++-
 .../cassandra/distributed/impl/MessageFilters.java |  31 +---
 .../distributed/test/DistributedTestBase.java      |  26 ++-
 .../cassandra/distributed/test/GossipTest.java     |  14 +-
 .../distributed/test/ResourceLeakTest.java         | 201 +++++++++++++++++++++
 .../cassandra/concurrent/SEPExecutorTest.java      |   3 +-
 43 files changed, 871 insertions(+), 273 deletions(-)

diff --cc CHANGES.txt
index 41ddef6,caea0f4..e956796
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -32,43 -6,12 +32,44 @@@
   * Fix index summary redistribution cancellation (CASSANDRA-15045)
   * Refactor Circle CI configuration (CASSANDRA-14806)
   * Fixing invalid CQL in security documentation (CASSANDRA-15020)
 - * Make tools/bin/token-generator py2/3 compatible (CASSANDRA-15012)
   * Multi-version in-JVM dtests (CASSANDRA-14937)
+  * Allow instance class loaders to be garbage collected for inJVM dtest (CASSANDRA-15170)
  
  
 -2.2.14
 +3.0.18
 + * Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
 + * Add a script to make running the cqlsh tests in cassandra repo easier (CASSANDRA-14951)
 + * If SizeEstimatesRecorder misses a 'onDropTable' notification, the size_estimates table will never be cleared for that table. (CASSANDRA-14905)
 + * Counters fail to increment in 2.1/2.2 to 3.X mixed version clusters (CASSANDRA-14958)
 + * Streaming needs to synchronise access to LifecycleTransaction (CASSANDRA-14554)
 + * Fix cassandra-stress write hang with default options (CASSANDRA-14616)
 + * Differentiate between slices and RTs when decoding legacy bounds (CASSANDRA-14919)
 + * CommitLogReplayer.handleReplayError should print stack traces (CASSANDRA-14589)
 + * Netty epoll IOExceptions caused by unclean client disconnects being logged at INFO (CASSANDRA-14909)
 + * Unfiltered.isEmpty conflicts with Row extends AbstractCollection.isEmpty (CASSANDRA-14588)
 + * RangeTombstoneList doesn't properly clean up mergeable or superseded rts in some cases (CASSANDRA-14894)
 + * Fix handling of collection tombstones for dropped columns from legacy sstables (CASSANDRA-14912)
 + * Throw exception if Columns serialized subset encode more columns than possible (CASSANDRA-14591)
 + * Drop/add column name with different Kind can result in corruption (CASSANDRA-14843)
 + * Fix missing rows when reading 2.1 SSTables with static columns in 3.0 (CASSANDRA-14873)
 + * Move TWCS message 'No compaction necessary for bucket size' to Trace level (CASSANDRA-14884)
 + * Sstable min/max metadata can cause data loss (CASSANDRA-14861)
 + * Dropped columns can cause reverse sstable iteration to return prematurely (CASSANDRA-14838)
 + * Legacy sstables with  multi block range tombstones create invalid bound sequences (CASSANDRA-14823)
 + * Expand range tombstone validation checks to multiple interim request stages (CASSANDRA-14824)
 + * Reverse order reads can return incomplete results (CASSANDRA-14803)
 + * Avoid calling iter.next() in a loop when notifying indexers about range tombstones (CASSANDRA-14794)
 + * Fix purging semi-expired RT boundaries in reversed iterators (CASSANDRA-14672)
 + * DESC order reads can fail to return the last Unfiltered in the partition (CASSANDRA-14766)
 + * Fix corrupted collection deletions for dropped columns in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
 + * Fix corrupted static collection deletions in 3.0 <-> 2.{1,2} messages (CASSANDRA-14568)
 + * Handle failures in parallelAllSSTableOperation (cleanup/upgradesstables/etc) (CASSANDRA-14657)
 + * Improve TokenMetaData cache populating performance avoid long locking (CASSANDRA-14660)
 + * Backport: Flush netty client messages immediately (not by default) (CASSANDRA-13651)
 + * Fix static column order for SELECT * wildcard queries (CASSANDRA-14638)
 + * sstableloader should use discovered broadcast address to connect intra-cluster (CASSANDRA-14522)
 + * Fix reading columns with non-UTF names from schema (CASSANDRA-14468)
 + Merged from 2.2:
   * CircleCI docker image should bake in more dependencies (CASSANDRA-14985)
   * Don't enable client transports when bootstrap is pending (CASSANDRA-14525)
   * MigrationManager attempts to pull schema from different major version nodes (CASSANDRA-14928)
diff --cc src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index ab2239c,0000000..71d60e7
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@@ -1,590 -1,0 +1,591 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.batchlog;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.ScheduledThreadPoolExecutor;
 +import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.collect.ArrayListMultimap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.ListMultimap;
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Multimap;
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.apache.cassandra.db.RowUpdateBuilder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.db.WriteType;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.marshal.UUIDType;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.exceptions.WriteFailureException;
 +import org.apache.cassandra.exceptions.WriteTimeoutException;
 +import org.apache.cassandra.gms.FailureDetector;
 +import org.apache.cassandra.hints.Hint;
 +import org.apache.cassandra.hints.HintsService;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
 +import org.apache.cassandra.net.MessageIn;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.service.WriteResponseHandler;
++import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.MBeanWrapper;
 +import org.apache.cassandra.utils.UUIDGen;
 +
 +import static com.google.common.collect.Iterables.transform;
 +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 +import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
 +
 +public class BatchlogManager implements BatchlogManagerMBean
 +{
 +    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
 +    private static final long REPLAY_INTERVAL = 10 * 1000; // milliseconds
 +    static final int DEFAULT_PAGE_SIZE = 128;
 +
 +    private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
 +    public static final BatchlogManager instance = new BatchlogManager();
 +    public static final long BATCHLOG_REPLAY_TIMEOUT = Long.getLong("cassandra.batchlog.replay_timeout_in_ms", DatabaseDescriptor.getWriteRpcTimeout() * 2);
 +
 +    private volatile long totalBatchesReplayed = 0; // no concurrency protection necessary as only written by replay thread.
 +    private volatile UUID lastReplayedUuid = UUIDGen.minTimeUUID(0);
 +
 +    // Single-thread executor service for scheduling and serializing log replay.
 +    private final ScheduledExecutorService batchlogTasks;
 +
 +    public BatchlogManager()
 +    {
 +        ScheduledThreadPoolExecutor executor = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
 +        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
 +        batchlogTasks = executor;
 +    }
 +
 +    public void start()
 +    {
 +        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 +
 +        batchlogTasks.scheduleWithFixedDelay(this::replayFailedBatches,
 +                                             StorageService.RING_DELAY,
 +                                             REPLAY_INTERVAL,
 +                                             TimeUnit.MILLISECONDS);
 +    }
 +
-     public void shutdown() throws InterruptedException
++    public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 +    {
-         batchlogTasks.shutdown();
-         batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
++        ExecutorUtils.shutdownAndWait(timeout, unit, batchlogTasks);
 +    }
 +
 +    public static void remove(UUID id)
 +    {
 +        new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
 +                                                         UUIDType.instance.decompose(id),
 +                                                         FBUtilities.timestampMicros(),
 +                                                         FBUtilities.nowInSeconds()))
 +            .apply();
 +    }
 +
 +    public static void store(Batch batch)
 +    {
 +        store(batch, true);
 +    }
 +
 +    public static void store(Batch batch, boolean durableWrites)
 +    {
 +        RowUpdateBuilder builder =
 +            new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
 +                .clustering()
 +                .add("version", MessagingService.current_version);
 +
 +        for (ByteBuffer mutation : batch.encodedMutations)
 +            builder.addListEntry("mutations", mutation);
 +
 +        for (Mutation mutation : batch.decodedMutations)
 +        {
 +            try (DataOutputBuffer buffer = new DataOutputBuffer())
 +            {
 +                Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
 +                builder.addListEntry("mutations", buffer.buffer());
 +            }
 +            catch (IOException e)
 +            {
 +                // shouldn't happen
 +                throw new AssertionError(e);
 +            }
 +        }
 +
 +        builder.build().apply(durableWrites);
 +    }
 +
 +    @VisibleForTesting
 +    public int countAllBatches()
 +    {
 +        String query = String.format("SELECT count(*) FROM %s.%s", SystemKeyspace.NAME, SystemKeyspace.BATCHES);
 +        UntypedResultSet results = executeInternal(query);
 +        if (results == null || results.isEmpty())
 +            return 0;
 +
 +        return (int) results.one().getLong("count");
 +    }
 +
 +    public long getTotalBatchesReplayed()
 +    {
 +        return totalBatchesReplayed;
 +    }
 +
 +    public void forceBatchlogReplay() throws Exception
 +    {
 +        startBatchlogReplay().get();
 +    }
 +
 +    public Future<?> startBatchlogReplay()
 +    {
 +        // If a replay is already in progress this request will be executed after it completes.
 +        return batchlogTasks.submit(this::replayFailedBatches);
 +    }
 +
 +    void performInitialReplay() throws InterruptedException, ExecutionException
 +    {
 +        // Invokes initial replay. Used for testing only.
 +        batchlogTasks.submit(this::replayFailedBatches).get();
 +    }
 +
 +    private void replayFailedBatches()
 +    {
 +        logger.trace("Started replayFailedBatches");
 +
 +        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
 +        // max rate is scaled by the number of nodes in the cluster (same as for HHOM - see CASSANDRA-5272).
 +        int endpointsCount = StorageService.instance.getTokenMetadata().getAllEndpoints().size();
 +        if (endpointsCount <= 0)
 +        {
 +            logger.trace("Replay cancelled as there are no peers in the ring.");
 +            return;
 +        }
 +        int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / endpointsCount;
 +        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
 +
 +        UUID limitUuid = UUIDGen.maxTimeUUID(System.currentTimeMillis() - getBatchlogTimeout());
 +        ColumnFamilyStore store = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES);
 +        int pageSize = calculatePageSize(store);
 +        // There cannot be any live content where token(id) <= token(lastReplayedUuid) as every processed batch is
 +        // deleted, but the tombstoned content may still be present in the tables. To avoid walking over it we specify
 +        // token(id) > token(lastReplayedUuid) as part of the query.
 +        String query = String.format("SELECT id, mutations, version FROM %s.%s WHERE token(id) > token(?) AND token(id) <= token(?)",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.BATCHES);
 +        UntypedResultSet batches = executeInternalWithPaging(query, pageSize, lastReplayedUuid, limitUuid);
 +        processBatchlogEntries(batches, pageSize, rateLimiter);
 +        lastReplayedUuid = limitUuid;
 +        logger.trace("Finished replayFailedBatches");
 +    }
 +
 +    // read less rows (batches) per page if they are very large
 +    static int calculatePageSize(ColumnFamilyStore store)
 +    {
 +        double averageRowSize = store.getMeanPartitionSize();
 +        if (averageRowSize <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        return (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, 4 * 1024 * 1024 / averageRowSize));
 +    }
 +
 +    private void processBatchlogEntries(UntypedResultSet batches, int pageSize, RateLimiter rateLimiter)
 +    {
 +        int positionInPage = 0;
 +        ArrayList<ReplayingBatch> unfinishedBatches = new ArrayList<>(pageSize);
 +
 +        Set<InetAddress> hintedNodes = new HashSet<>();
 +        Set<UUID> replayedBatches = new HashSet<>();
 +
 +        // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
 +        for (UntypedResultSet.Row row : batches)
 +        {
 +            UUID id = row.getUUID("id");
 +            int version = row.getInt("version");
 +            try
 +            {
 +                ReplayingBatch batch = new ReplayingBatch(id, version, row.getList("mutations", BytesType.instance));
 +                if (batch.replay(rateLimiter, hintedNodes) > 0)
 +                {
 +                    unfinishedBatches.add(batch);
 +                }
 +                else
 +                {
 +                    remove(id); // no write mutations were sent (either expired or all CFs involved truncated).
 +                    ++totalBatchesReplayed;
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                logger.warn("Skipped batch replay of {} due to {}", id, e);
 +                remove(id);
 +            }
 +
 +            if (++positionInPage == pageSize)
 +            {
 +                // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
 +                // finish processing the page before requesting the next row.
 +                finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
 +                positionInPage = 0;
 +            }
 +        }
 +
 +        finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
 +
 +        // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
 +        HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
 +
 +        // once all generated hints are fsynced, actually delete the batches
 +        replayedBatches.forEach(BatchlogManager::remove);
 +    }
 +
 +    private void finishAndClearBatches(ArrayList<ReplayingBatch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
 +    {
 +        // schedule hints for timed out deliveries
 +        for (ReplayingBatch batch : batches)
 +        {
 +            batch.finish(hintedNodes);
 +            replayedBatches.add(batch.id);
 +        }
 +
 +        totalBatchesReplayed += batches.size();
 +        batches.clear();
 +    }
 +
 +    public static long getBatchlogTimeout()
 +    {
 +        return BATCHLOG_REPLAY_TIMEOUT; // enough time for the actual write + BM removal mutation
 +    }
 +
 +    private static class ReplayingBatch
 +    {
 +        private final UUID id;
 +        private final long writtenAt;
 +        private final List<Mutation> mutations;
 +        private final int replayedBytes;
 +
 +        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
 +
 +        ReplayingBatch(UUID id, int version, List<ByteBuffer> serializedMutations) throws IOException
 +        {
 +            this.id = id;
 +            this.writtenAt = UUIDGen.unixTimestamp(id);
 +            this.mutations = new ArrayList<>(serializedMutations.size());
 +            this.replayedBytes = addMutations(version, serializedMutations);
 +        }
 +
 +        public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
 +        {
 +            logger.trace("Replaying batch {}", id);
 +
 +            if (mutations.isEmpty())
 +                return 0;
 +
 +            int gcgs = gcgs(mutations);
 +            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
 +                return 0;
 +
 +            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
 +
 +            rateLimiter.acquire(replayedBytes); // acquire afterwards, to not mess up ttl calculation.
 +
 +            return replayHandlers.size();
 +        }
 +
 +        public void finish(Set<InetAddress> hintedNodes)
 +        {
 +            for (int i = 0; i < replayHandlers.size(); i++)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
 +                try
 +                {
 +                    handler.get();
 +                }
 +                catch (WriteTimeoutException|WriteFailureException e)
 +                {
 +                    logger.trace("Failed replaying a batched mutation to a node, will write a hint");
 +                    logger.trace("Failure was : {}", e.getMessage());
 +                    // writing hints for the rest to hints, starting from i
 +                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
 +                    return;
 +                }
 +            }
 +        }
 +
 +        private int addMutations(int version, List<ByteBuffer> serializedMutations) throws IOException
 +        {
 +            int ret = 0;
 +            for (ByteBuffer serializedMutation : serializedMutations)
 +            {
 +                ret += serializedMutation.remaining();
 +                try (DataInputBuffer in = new DataInputBuffer(serializedMutation, true))
 +                {
 +                    addMutation(Mutation.serializer.deserialize(in, version));
 +                }
 +            }
 +
 +            return ret;
 +        }
 +
 +        // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis.
 +        // We don't abort the replay entirely b/c this can be considered a success (truncated is same as delivered then
 +        // truncated.
 +        private void addMutation(Mutation mutation)
 +        {
 +            for (UUID cfId : mutation.getColumnFamilyIds())
 +                if (writtenAt <= SystemKeyspace.getTruncatedAt(cfId))
 +                    mutation = mutation.without(cfId);
 +
 +            if (!mutation.isEmpty())
 +                mutations.add(mutation);
 +        }
 +
 +        private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
 +        {
 +            int gcgs = gcgs(mutations);
 +
 +            // expired
 +            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
 +                return;
 +
 +            for (int i = startFrom; i < replayHandlers.size(); i++)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
 +                Mutation undeliveredMutation = mutations.get(i);
 +
 +                if (handler != null)
 +                {
 +                    hintedNodes.addAll(handler.undelivered);
 +                    HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
 +                                                Hint.create(undeliveredMutation, writtenAt));
 +                }
 +            }
 +        }
 +
 +        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
 +                                                                              long writtenAt,
 +                                                                              Set<InetAddress> hintedNodes)
 +        {
 +            List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
 +            for (Mutation mutation : mutations)
 +            {
 +                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
 +                if (handler != null)
 +                    handlers.add(handler);
 +            }
 +            return handlers;
 +        }
 +
 +        /**
 +         * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
 +         * when a replica is down or a write request times out.
 +         *
 +         * @return direct delivery handler to wait on or null, if no live nodes found
 +         */
 +        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
 +                                                                                     long writtenAt,
 +                                                                                     Set<InetAddress> hintedNodes)
 +        {
 +            Set<InetAddress> liveEndpoints = new HashSet<>();
 +            String ks = mutation.getKeyspaceName();
 +            Token tk = mutation.key().getToken();
 +
 +            for (InetAddress endpoint : StorageService.instance.getNaturalAndPendingEndpoints(ks, tk))
 +            {
 +                if (endpoint.equals(FBUtilities.getBroadcastAddress()))
 +                {
 +                    mutation.apply();
 +                }
 +                else if (FailureDetector.instance.isAlive(endpoint))
 +                {
 +                    liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
 +                }
 +                else
 +                {
 +                    hintedNodes.add(endpoint);
 +                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
 +                                                Hint.create(mutation, writtenAt));
 +                }
 +            }
 +
 +            if (liveEndpoints.isEmpty())
 +                return null;
 +
 +            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
 +            MessageOut<Mutation> message = mutation.createMessage();
 +            for (InetAddress endpoint : liveEndpoints)
 +                MessagingService.instance().sendRR(message, endpoint, handler, false);
 +            return handler;
 +        }
 +
 +        private static int gcgs(Collection<Mutation> mutations)
 +        {
 +            int gcgs = Integer.MAX_VALUE;
 +            for (Mutation mutation : mutations)
 +                gcgs = Math.min(gcgs, mutation.smallestGCGS());
 +            return gcgs;
 +        }
 +
 +        /**
 +         * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
 +         * which we did not receive a successful reply.
 +         */
 +        private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
 +        {
 +            private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
 +
 +            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
 +            {
 +                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
 +                undelivered.addAll(writeEndpoints);
 +            }
 +
 +            @Override
 +            protected int totalBlockFor()
 +            {
 +                return this.naturalEndpoints.size();
 +            }
 +
 +            @Override
 +            public void response(MessageIn<T> m)
 +            {
 +                boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
 +                assert removed;
 +                super.response(m);
 +            }
 +        }
 +    }
 +
 +    public static class EndpointFilter
 +    {
 +        private final String localRack;
 +        private final Multimap<String, InetAddress> endpoints;
 +
 +        public EndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
 +        {
 +            this.localRack = localRack;
 +            this.endpoints = endpoints;
 +        }
 +
 +        /**
 +         * @return list of candidates for batchlog hosting. If possible these will be two nodes from different racks.
 +         */
 +        public Collection<InetAddress> filter()
 +        {
 +            // special case for single-node data centers
 +            if (endpoints.values().size() == 1)
 +                return endpoints.values();
 +
 +            // strip out dead endpoints and localhost
 +            ListMultimap<String, InetAddress> validated = ArrayListMultimap.create();
 +            for (Map.Entry<String, InetAddress> entry : endpoints.entries())
 +                if (isValid(entry.getValue()))
 +                    validated.put(entry.getKey(), entry.getValue());
 +
 +            if (validated.size() <= 2)
 +                return validated.values();
 +
 +            if (validated.size() - validated.get(localRack).size() >= 2)
 +            {
 +                // we have enough endpoints in other racks
 +                validated.removeAll(localRack);
 +            }
 +
 +            if (validated.keySet().size() == 1)
 +            {
 +                /*
 +                 * we have only 1 `other` rack to select replicas from (whether it be the local rack or a single non-local rack)
 +                 * pick two random nodes from there; we are guaranteed to have at least two nodes in the single remaining rack
 +                 * because of the preceding if block.
 +                 */
 +                List<InetAddress> otherRack = Lists.newArrayList(validated.values());
 +                shuffle(otherRack);
 +                return otherRack.subList(0, 2);
 +            }
 +
 +            // randomize which racks we pick from if more than 2 remaining
 +            Collection<String> racks;
 +            if (validated.keySet().size() == 2)
 +            {
 +                racks = validated.keySet();
 +            }
 +            else
 +            {
 +                racks = Lists.newArrayList(validated.keySet());
 +                shuffle((List<String>) racks);
 +            }
 +
 +            // grab a random member of up to two racks
 +            List<InetAddress> result = new ArrayList<>(2);
 +            for (String rack : Iterables.limit(racks, 2))
 +            {
 +                List<InetAddress> rackMembers = validated.get(rack);
 +                result.add(rackMembers.get(getRandomInt(rackMembers.size())));
 +            }
 +
 +            return result;
 +        }
 +
 +        @VisibleForTesting
 +        protected boolean isValid(InetAddress input)
 +        {
 +            return !input.equals(FBUtilities.getBroadcastAddress()) && FailureDetector.instance.isAlive(input);
 +        }
 +
 +        @VisibleForTesting
 +        protected int getRandomInt(int bound)
 +        {
 +            return ThreadLocalRandom.current().nextInt(bound);
 +        }
 +
 +        @VisibleForTesting
 +        protected void shuffle(List<?> list)
 +        {
 +            Collections.shuffle(list);
 +        }
 +    }
 +}
diff --cc src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index 3997c1a,50cc5a3..e6a0df7
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@@ -115,10 -114,9 +115,10 @@@ public class SharedExecutorPoo
          return executor;
      }
  
-     public synchronized void shutdownAndWait() throws InterruptedException
 -    public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException
++    public synchronized void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException
      {
          shuttingDown = true;
 +        List<SEPExecutor> executors = new ArrayList<>(this.executors);
          for (SEPExecutor executor : executors)
              executor.shutdownNow();
  
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 355d710,01330a6..c5e81f0
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -55,32 -67,29 +55,34 @@@ import org.apache.cassandra.db.rows.Cel
  import org.apache.cassandra.dht.*;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.FSReadError;
 -import org.apache.cassandra.io.compress.CompressionParameters;
 +import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.index.internal.CassandraIndex;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.FSError;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.sstable.Component;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
  import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.SSTableMultiWriter;
  import org.apache.cassandra.io.sstable.format.*;
 -import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
 -import org.apache.cassandra.io.sstable.metadata.MetadataType;
 +import org.apache.cassandra.io.sstable.format.big.BigFormat;
 +import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.metrics.ColumnFamilyMetrics;
 -import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.metrics.TableMetrics.Sampler;
 +import org.apache.cassandra.schema.*;
  import org.apache.cassandra.service.CacheService;
  import org.apache.cassandra.service.StorageService;
 -import org.apache.cassandra.streaming.StreamLockfile;
 -import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.utils.*;
 -import org.apache.cassandra.utils.concurrent.*;
  import org.apache.cassandra.utils.TopKSampler.SamplerResult;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
  import org.apache.cassandra.utils.memory.MemtableAllocator;
 -
 -import com.clearspring.analytics.stream.Counter;
 +import org.json.simple.JSONArray;
 +import org.json.simple.JSONObject;
  
+ import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
+ import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
  import static org.apache.cassandra.utils.Throwables.maybeFail;
  
  public class ColumnFamilyStore implements ColumnFamilyStoreMBean
@@@ -220,14 -194,6 +222,8 @@@
      public volatile long sampleLatencyNanos;
      private final ScheduledFuture<?> latencyCalculator;
  
 +    private volatile boolean compactionSpaceCheck = true;
 +
-     public static void shutdownFlushExecutor() throws InterruptedException
-     {
-         flushExecutor.shutdown();
-         flushExecutor.awaitTermination(60, TimeUnit.SECONDS);
-     }
- 
      public static void shutdownPostFlushExecutor() throws InterruptedException
      {
          postFlushExecutor.shutdown();
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index c39f45a,bd4fe13..e9e0648
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -23,18 -23,14 +23,19 @@@ import java.util.*
  import java.util.Map.Entry;
  import java.util.concurrent.*;
  import java.util.concurrent.locks.ReentrantLock;
 +import java.util.stream.Collectors;
  
  import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Throwables;
  import com.google.common.collect.ImmutableList;
  import com.google.common.collect.ImmutableMap;
 +import com.google.common.util.concurrent.ListenableFutureTask;
  import com.google.common.util.concurrent.Uninterruptibles;
  
 +import io.netty.util.concurrent.FastThreadLocal;
+ import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.MBeanWrapper;
 +import org.apache.cassandra.utils.NoSpamLogger;
  import org.apache.cassandra.utils.Pair;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
diff --cc src/java/org/apache/cassandra/hints/HintsCatalog.java
index 7d5c8e6,0000000..5a92889
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/hints/HintsCatalog.java
+++ b/src/java/org/apache/cassandra/hints/HintsCatalog.java
@@@ -1,174 -1,0 +1,175 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.hints;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.nio.file.Files;
++import java.nio.file.Path;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.stream.Stream;
 +import javax.annotation.Nullable;
 +
 +import com.google.common.collect.ImmutableMap;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.io.FSError;
 +import org.apache.cassandra.io.FSReadError;
 +import org.apache.cassandra.io.FSWriteError;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.utils.NativeLibrary;
 +import org.apache.cassandra.utils.SyncUtil;
 +
 +import static java.util.stream.Collectors.groupingBy;
 +
 +/**
 + * A simple catalog for easy host id -> {@link HintsStore} lookup and manipulation.
 + */
 +final class HintsCatalog
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(HintsCatalog.class);
 +
 +    private final File hintsDirectory;
 +    private final Map<UUID, HintsStore> stores;
 +    private final ImmutableMap<String, Object> writerParams;
 +
 +    private HintsCatalog(File hintsDirectory, ImmutableMap<String, Object> writerParams, Map<UUID, List<HintsDescriptor>> descriptors)
 +    {
 +        this.hintsDirectory = hintsDirectory;
 +        this.writerParams = writerParams;
 +        this.stores = new ConcurrentHashMap<>();
 +
 +        for (Map.Entry<UUID, List<HintsDescriptor>> entry : descriptors.entrySet())
 +            stores.put(entry.getKey(), HintsStore.create(entry.getKey(), hintsDirectory, writerParams, entry.getValue()));
 +    }
 +
 +    /**
 +     * Loads hints stores from a given directory.
 +     */
 +    static HintsCatalog load(File hintsDirectory, ImmutableMap<String, Object> writerParams)
 +    {
-         try
++        try(Stream<Path> list = Files.list(hintsDirectory.toPath()))
 +        {
 +            Map<UUID, List<HintsDescriptor>> stores =
-                 Files.list(hintsDirectory.toPath())
++                     list
 +                     .filter(HintsDescriptor::isHintFileName)
 +                     .map(HintsDescriptor::readFromFileQuietly)
 +                     .filter(Optional::isPresent)
 +                     .map(Optional::get)
 +                     .collect(groupingBy(h -> h.hostId));
 +            return new HintsCatalog(hintsDirectory, writerParams, stores);
 +        }
 +        catch (IOException e)
 +        {
 +            throw new FSReadError(e, hintsDirectory);
 +        }
 +    }
 +
 +    Stream<HintsStore> stores()
 +    {
 +        return stores.values().stream();
 +    }
 +
 +    void maybeLoadStores(Iterable<UUID> hostIds)
 +    {
 +        for (UUID hostId : hostIds)
 +            get(hostId);
 +    }
 +
 +    HintsStore get(UUID hostId)
 +    {
 +        // we intentionally don't just return stores.computeIfAbsent() because it's expensive compared to simple get(),
 +        // and in this case would also allocate for the capturing lambda; the method is on a really hot path
 +        HintsStore store = stores.get(hostId);
 +        return store == null
 +             ? stores.computeIfAbsent(hostId, (id) -> HintsStore.create(id, hintsDirectory, writerParams, Collections.emptyList()))
 +             : store;
 +    }
 +
 +    @Nullable
 +    HintsStore getNullable(UUID hostId)
 +    {
 +        return stores.get(hostId);
 +    }
 +
 +    /**
 +     * Delete all hints for all host ids.
 +     *
 +     * Will not delete the files that are currently being dispatched, or written to.
 +     */
 +    void deleteAllHints()
 +    {
 +        stores.keySet().forEach(this::deleteAllHints);
 +    }
 +
 +    /**
 +     * Delete all hints for the specified host id.
 +     *
 +     * Will not delete the files that are currently being dispatched, or written to.
 +     */
 +    void deleteAllHints(UUID hostId)
 +    {
 +        HintsStore store = stores.get(hostId);
 +        if (store != null)
 +            store.deleteAllHints();
 +    }
 +
 +    /**
 +     * @return true if at least one of the stores has a file pending dispatch
 +     */
 +    boolean hasFiles()
 +    {
 +        return stores().anyMatch(HintsStore::hasFiles);
 +    }
 +
 +    void exciseStore(UUID hostId)
 +    {
 +        deleteAllHints(hostId);
 +        stores.remove(hostId);
 +    }
 +
 +    void fsyncDirectory()
 +    {
 +        int fd = NativeLibrary.tryOpenDirectory(hintsDirectory.getAbsolutePath());
 +        if (fd != -1)
 +        {
 +            try
 +            {
 +                SyncUtil.trySync(fd);
 +                NativeLibrary.tryCloseFD(fd);
 +            }
 +            catch (FSError e) // trySync failed
 +            {
 +                logger.error("Unable to sync directory {}", hintsDirectory.getAbsolutePath(), e);
 +                FileUtils.handleFSErrorAndPropagate(e);
 +            }
 +        }
 +        else
 +        {
 +            logger.error("Unable to open directory {}", hintsDirectory.getAbsolutePath());
 +            FileUtils.handleFSErrorAndPropagate(new FSWriteError(new IOException(String.format("Unable to open hint directory %s", hintsDirectory.getAbsolutePath())), hintsDirectory.getAbsolutePath()));
 +        }
 +    }
 +
 +    ImmutableMap<String, Object> getWriterParams()
 +    {
 +        return writerParams;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index e93057b,0000000..d66a18b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@@ -1,1156 -1,0 +1,1158 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index;
 +
 +import java.lang.reflect.Constructor;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +import java.util.stream.Stream;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Joiner;
 +import com.google.common.base.Strings;
 +import com.google.common.collect.ImmutableSet;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Maps;
 +import com.google.common.collect.Sets;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.MoreExecutors;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.internal.CassandraIndex;
 +import org.apache.cassandra.index.transactions.*;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.Indexes;
 +import org.apache.cassandra.service.pager.SinglePartitionPager;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
++import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
++
 +/**
 + * Handles the core maintenance functionality associated with indexes: adding/removing them to or from
 + * a table, (re)building during bootstrap or other streaming operations, flushing, reloading metadata
 + * and so on.
 + *
 + * The Index interface defines a number of methods which return Callable<?>. These are primarily the
 + * management tasks for an index implementation. Most of them are currently executed in a blocking
 + * fashion via submission to SIM's blockingExecutor. This provides the desired behaviour in pretty
 + * much all cases, as tasks like flushing an index needs to be executed synchronously to avoid potentially
 + * deadlocking on the FlushWriter or PostFlusher. Several of these Callable<?> returning methods on Index could
 + * then be defined with as void and called directly from SIM (rather than being run via the executor service).
 + * Separating the task defintion from execution gives us greater flexibility though, so that in future, for example,
 + * if the flush process allows it we leave open the possibility of executing more of these tasks asynchronously.
 + *
 + * The primary exception to the above is the Callable returned from Index#addIndexedColumn. This may
 + * involve a significant effort, building a new index over any existing data. We perform this task asynchronously;
 + * as it is called as part of a schema update, which we do not want to block for a long period. Building non-custom
 + * indexes is performed on the CompactionManager.
 + *
 + * This class also provides instances of processors which listen to updates to the base table and forward to
 + * registered Indexes the info required to keep those indexes up to date.
 + * There are two variants of these processors, each with a factory method provided by SIM:
 + *      IndexTransaction: deals with updates generated on the regular write path.
 + *      CleanupTransaction: used when partitions are modified during compaction or cleanup operations.
 + * Further details on their usage and lifecycles can be found in the interface definitions below.
 + *
 + * Finally, the bestIndexFor method is used at query time to identify the most selective index of those able
 + * to satisfy any search predicates defined by a ReadCommand's RowFilter. It returns a thin IndexAccessor object
 + * which enables the ReadCommand to access the appropriate functions of the Index at various stages in its lifecycle.
 + * e.g. the getEstimatedResultRows is required when StorageProxy calculates the initial concurrency factor for
 + * distributing requests to replicas, whereas a Searcher instance is needed when the ReadCommand is executed locally on
 + * a target replica.
 + */
 +public class SecondaryIndexManager implements IndexRegistry
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 +
 +    // default page size (in rows) when rebuilding the index for a whole partition
 +    public static final int DEFAULT_PAGE_SIZE = 10000;
 +
 +    private Map<String, Index> indexes = Maps.newConcurrentMap();
 +
 +    /**
 +     * The indexes that are ready to server requests.
 +     */
 +    private Set<String> builtIndexes = Sets.newConcurrentHashSet();
 +
 +    // executes tasks returned by Indexer#addIndexColumn which may require index(es) to be (re)built
 +    private static final ExecutorService asyncExecutor =
 +        new JMXEnabledThreadPoolExecutor(1,
 +                                         StageManager.KEEPALIVE,
 +                                         TimeUnit.SECONDS,
 +                                         new LinkedBlockingQueue<>(),
 +                                         new NamedThreadFactory("SecondaryIndexManagement"),
 +                                         "internal");
 +
 +    // executes all blocking tasks produced by Indexers e.g. getFlushTask, getMetadataReloadTask etc
 +    private static final ExecutorService blockingExecutor = MoreExecutors.newDirectExecutorService();
 +
 +    /**
 +     * The underlying column family containing the source data for these indexes
 +     */
 +    public final ColumnFamilyStore baseCfs;
 +
 +    public SecondaryIndexManager(ColumnFamilyStore baseCfs)
 +    {
 +        this.baseCfs = baseCfs;
 +    }
 +
 +
 +    /**
 +     * Drops and adds new indexes associated with the underlying CF
 +     */
 +    public void reload()
 +    {
 +        // figure out what needs to be added and dropped.
 +        Indexes tableIndexes = baseCfs.metadata.getIndexes();
 +        indexes.keySet()
 +               .stream()
 +               .filter(indexName -> !tableIndexes.has(indexName))
 +               .forEach(this::removeIndex);
 +
 +        // we call add for every index definition in the collection as
 +        // some may not have been created here yet, only added to schema
 +        for (IndexMetadata tableIndex : tableIndexes)
 +            addIndex(tableIndex);
 +    }
 +
 +    private Future<?> reloadIndex(IndexMetadata indexDef)
 +    {
 +        Index index = indexes.get(indexDef.name);
 +        Callable<?> reloadTask = index.getMetadataReloadTask(indexDef);
 +        return reloadTask == null
 +               ? Futures.immediateFuture(null)
 +               : blockingExecutor.submit(reloadTask);
 +    }
 +
 +    private Future<?> createIndex(IndexMetadata indexDef)
 +    {
 +        Index index = createInstance(indexDef);
 +        index.register(this);
 +
 +        // if the index didn't register itself, we can probably assume that no initialization needs to happen
 +        final Callable<?> initialBuildTask = indexes.containsKey(indexDef.name)
 +                                           ? index.getInitializationTask()
 +                                           : null;
 +        if (initialBuildTask == null)
 +        {
 +            // We need to make sure that the index is marked as built in the case where the initialBuildTask
 +            // does not need to be run (if the index didn't register itself or if the base table was empty).
 +            markIndexBuilt(indexDef.name);
 +            return Futures.immediateFuture(null);
 +        }
 +        return asyncExecutor.submit(index.getInitializationTask());
 +    }
 +
 +    /**
 +     * Adds and builds a index
 +     * @param indexDef the IndexMetadata describing the index
 +     */
 +    public synchronized Future<?> addIndex(IndexMetadata indexDef)
 +    {
 +        if (indexes.containsKey(indexDef.name))
 +            return reloadIndex(indexDef);
 +        else
 +            return createIndex(indexDef);
 +    }
 +
 +    /**
 +     * Checks if the specified index is queryable.
 +     *
 +     * @param index the index
 +     * @return <code>true</code> if the specified index is queryable, <code>false</code> otherwise
 +     */
 +    public boolean isIndexQueryable(Index index)
 +    {
 +        return builtIndexes.contains(index.getIndexMetadata().name);
 +    }
 +
 +    public synchronized void removeIndex(String indexName)
 +    {
 +        Index index = unregisterIndex(indexName);
 +        if (null != index)
 +        {
 +            markIndexRemoved(indexName);
 +            executeBlocking(index.getInvalidateTask());
 +        }
 +    }
 +
 +
 +    public Set<IndexMetadata> getDependentIndexes(ColumnDefinition column)
 +    {
 +        if (indexes.isEmpty())
 +            return Collections.emptySet();
 +
 +        Set<IndexMetadata> dependentIndexes = new HashSet<>();
 +        for (Index index : indexes.values())
 +            if (index.dependsOn(column))
 +                dependentIndexes.add(index.getIndexMetadata());
 +
 +        return dependentIndexes;
 +    }
 +
 +    /**
 +     * Called when dropping a Table
 +     */
 +    public void markAllIndexesRemoved()
 +    {
 +       getBuiltIndexNames().forEach(this::markIndexRemoved);
 +    }
 +
 +    /**
 +    * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
 +    * Caller must acquire and release references to the sstables used here.
 +    * Note also that only this method of (re)building indexes:
 +    *   a) takes a set of index *names* rather than Indexers
 +    *   b) marks exsiting indexes removed prior to rebuilding
 +    *
 +    * @param sstables the data to build from
 +    * @param indexNames the list of indexes to be rebuilt
 +    */
 +    public void rebuildIndexesBlocking(Collection<SSTableReader> sstables, Set<String> indexNames)
 +    {
 +        Set<Index> toRebuild = indexes.values().stream()
 +                                               .filter(index -> indexNames.contains(index.getIndexMetadata().name))
 +                                               .filter(Index::shouldBuildBlocking)
 +                                               .collect(Collectors.toSet());
 +        if (toRebuild.isEmpty())
 +        {
 +            logger.info("No defined indexes with the supplied names: {}", Joiner.on(',').join(indexNames));
 +            return;
 +        }
 +
 +        toRebuild.forEach(indexer -> markIndexRemoved(indexer.getIndexMetadata().name));
 +
 +        buildIndexesBlocking(sstables, toRebuild);
 +
 +        toRebuild.forEach(indexer -> markIndexBuilt(indexer.getIndexMetadata().name));
 +    }
 +
 +    public void buildAllIndexesBlocking(Collection<SSTableReader> sstables)
 +    {
 +        buildIndexesBlocking(sstables, indexes.values()
 +                                              .stream()
 +                                              .filter(Index::shouldBuildBlocking)
 +                                              .collect(Collectors.toSet()));
 +    }
 +
 +    // For convenience, may be called directly from Index impls
 +    public void buildIndexBlocking(Index index)
 +    {
 +        if (index.shouldBuildBlocking())
 +        {
 +            try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL));
 +                 Refs<SSTableReader> sstables = viewFragment.refs)
 +            {
 +                buildIndexesBlocking(sstables, Collections.singleton(index));
 +                markIndexBuilt(index.getIndexMetadata().name);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Checks if the specified {@link ColumnFamilyStore} is a secondary index.
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code> to check.
 +     * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
 +     * <code>false</code> otherwise.
 +     */
 +    public static boolean isIndexColumnFamilyStore(ColumnFamilyStore cfs)
 +    {
 +        return isIndexColumnFamily(cfs.name);
 +    }
 +
 +    /**
 +     * Checks if the specified {@link ColumnFamilyStore} is the one secondary index.
 +     *
 +     * @param cfName the name of the <code>ColumnFamilyStore</code> to check.
 +     * @return <code>true</code> if the specified <code>ColumnFamilyStore</code> is a secondary index,
 +     * <code>false</code> otherwise.
 +     */
 +    public static boolean isIndexColumnFamily(String cfName)
 +    {
 +        return cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    /**
 +     * Returns the parent of the specified {@link ColumnFamilyStore}.
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code>
 +     * @return the parent of the specified <code>ColumnFamilyStore</code>
 +     */
 +    public static ColumnFamilyStore getParentCfs(ColumnFamilyStore cfs)
 +    {
 +        String parentCfs = getParentCfsName(cfs.name);
 +        return cfs.keyspace.getColumnFamilyStore(parentCfs);
 +    }
 +
 +    /**
 +     * Returns the parent name of the specified {@link ColumnFamilyStore}.
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the parent name of the specified <code>ColumnFamilyStore</code>
 +     */
 +    public static String getParentCfsName(String cfName)
 +    {
 +        assert isIndexColumnFamily(cfName);
 +        return StringUtils.substringBefore(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    /**
 +     * Returns the index name
 +     *
 +     * @param cfs the <code>ColumnFamilyStore</code>
 +     * @return the index name
 +     */
 +    public static String getIndexName(ColumnFamilyStore cfs)
 +    {
 +        return getIndexName(cfs.name);
 +    }
 +
 +    /**
 +     * Returns the index name
 +     *
 +     * @param cfName the <code>ColumnFamilyStore</code> name
 +     * @return the index name
 +     */
 +    public static String getIndexName(String cfName)
 +    {
 +        assert isIndexColumnFamily(cfName);
 +        return StringUtils.substringAfter(cfName, Directories.SECONDARY_INDEX_NAME_SEPARATOR);
 +    }
 +
 +    private void buildIndexesBlocking(Collection<SSTableReader> sstables, Set<Index> indexes)
 +    {
 +        if (indexes.isEmpty())
 +            return;
 +
 +        logger.info("Submitting index build of {} for data in {}",
 +                    indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
 +                    sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
 +
 +        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                  indexes,
 +                                                                  new ReducingKeyIterator(sstables));
 +        Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +        FBUtilities.waitOnFuture(future);
 +
 +        flushIndexesBlocking(indexes);
 +        logger.info("Index build of {} complete",
 +                    indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")));
 +    }
 +
 +    /**
 +     * Marks the specified index as build.
 +     * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
 +     * @param indexName the index name
 +     */
 +    public void markIndexBuilt(String indexName)
 +    {
 +        builtIndexes.add(indexName);
 +        if (DatabaseDescriptor.isDaemonInitialized())
 +            SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), indexName);
 +    }
 +
 +    /**
 +     * Marks the specified index as removed.
 +     * <p>This method is public as it need to be accessible from the {@link Index} implementations</p>
 +     * @param indexName the index name
 +     */
 +    public void markIndexRemoved(String indexName)
 +    {
 +        SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), indexName);
 +    }
 +
 +    public Index getIndexByName(String indexName)
 +    {
 +        return indexes.get(indexName);
 +    }
 +
 +    private Index createInstance(IndexMetadata indexDef)
 +    {
 +        Index newIndex;
 +        if (indexDef.isCustom())
 +        {
 +            assert indexDef.options != null;
 +            String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
 +            assert ! Strings.isNullOrEmpty(className);
 +            try
 +            {
 +                Class<? extends Index> indexClass = FBUtilities.classForName(className, "Index");
 +                Constructor<? extends Index> ctor = indexClass.getConstructor(ColumnFamilyStore.class, IndexMetadata.class);
 +                newIndex = (Index)ctor.newInstance(baseCfs, indexDef);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        }
 +        else
 +        {
 +            newIndex = CassandraIndex.newIndex(baseCfs, indexDef);
 +        }
 +        return newIndex;
 +    }
 +
 +    /**
 +     * Truncate all indexes
 +     */
 +    public void truncateAllIndexesBlocking(final long truncatedAt)
 +    {
 +        executeAllBlocking(indexes.values().stream(), (index) -> index.getTruncateTask(truncatedAt));
 +    }
 +
 +    /**
 +     * Remove all indexes
 +     */
 +    public void invalidateAllIndexesBlocking()
 +    {
 +        markAllIndexesRemoved();
 +        executeAllBlocking(indexes.values().stream(), Index::getInvalidateTask);
 +    }
 +
 +    /**
 +     * Perform a blocking flush all indexes
 +     */
 +    public void flushAllIndexesBlocking()
 +    {
 +       flushIndexesBlocking(ImmutableSet.copyOf(indexes.values()));
 +    }
 +
 +    /**
 +     * Perform a blocking flush of selected indexes
 +     */
 +    public void flushIndexesBlocking(Set<Index> indexes)
 +    {
 +        if (indexes.isEmpty())
 +            return;
 +
 +        List<Future<?>> wait = new ArrayList<>();
 +        List<Index> nonCfsIndexes = new ArrayList<>();
 +
 +        // for each CFS backed index, submit a flush task which we'll wait on for completion
 +        // for the non-CFS backed indexes, we'll flush those while we wait.
 +        synchronized (baseCfs.getTracker())
 +        {
 +            indexes.forEach(index ->
 +                index.getBackingTable()
 +                     .map(cfs -> wait.add(cfs.forceFlush()))
 +                     .orElseGet(() -> nonCfsIndexes.add(index)));
 +        }
 +
 +        executeAllBlocking(nonCfsIndexes.stream(), Index::getBlockingFlushTask);
 +        FBUtilities.waitOnFutures(wait);
 +    }
 +
 +    /**
 +     * Performs a blocking flush of all custom indexes
 +     */
 +    public void flushAllNonCFSBackedIndexesBlocking()
 +    {
 +        executeAllBlocking(indexes.values()
 +                                  .stream()
 +                                  .filter(index -> !index.getBackingTable().isPresent()),
 +                           Index::getBlockingFlushTask);
 +    }
 +
 +    /**
 +     * @return all indexes which are marked as built and ready to use
 +     */
 +    public List<String> getBuiltIndexNames()
 +    {
 +        Set<String> allIndexNames = new HashSet<>();
 +        indexes.values().stream()
 +                .map(i -> i.getIndexMetadata().name)
 +                .forEach(allIndexNames::add);
 +        return SystemKeyspace.getBuiltIndexes(baseCfs.keyspace.getName(), allIndexNames);
 +    }
 +
 +    /**
 +     * @return all backing Tables used by registered indexes
 +     */
 +    public Set<ColumnFamilyStore> getAllIndexColumnFamilyStores()
 +    {
 +        Set<ColumnFamilyStore> backingTables = new HashSet<>();
 +        indexes.values().forEach(index -> index.getBackingTable().ifPresent(backingTables::add));
 +        return backingTables;
 +    }
 +
 +    /**
 +     * @return if there are ANY indexes registered for this table
 +     */
 +    public boolean hasIndexes()
 +    {
 +        return !indexes.isEmpty();
 +    }
 +
 +    /**
 +     * When building an index against existing data in sstables, add the given partition to the index
 +     */
 +    public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize)
 +    {
 +        if (logger.isTraceEnabled())
 +            logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey()));
 +
 +        if (!indexes.isEmpty())
 +        {
 +            SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata,
 +                                                                                          FBUtilities.nowInSeconds(),
 +                                                                                          key);
 +            int nowInSec = cmd.nowInSec();
 +            boolean readStatic = false;
 +
 +            SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION);
 +            while (!pager.isExhausted())
 +            {
 +                try (ReadOrderGroup readGroup = cmd.startOrderGroup();
 +                     OpOrder.Group writeGroup = Keyspace.writeOrder.start();
 +                     UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata, pageSize, readGroup))
 +                {
 +                    if (!page.hasNext())
 +                        break;
 +
 +                    try (UnfilteredRowIterator partition = page.next()) {
 +                        Set<Index.Indexer> indexers = indexes.stream()
 +                                                             .map(index -> index.indexerFor(key,
 +                                                                                            partition.columns(),
 +                                                                                            nowInSec,
 +                                                                                            writeGroup,
 +                                                                                            IndexTransaction.Type.UPDATE))
 +                                                             .filter(Objects::nonNull)
 +                                                             .collect(Collectors.toSet());
 +
 +                        // Short-circuit empty partitions if static row is processed or isn't read
 +                        if (!readStatic && partition.isEmpty() && partition.staticRow().isEmpty())
 +                            break;
 +
 +                        indexers.forEach(Index.Indexer::begin);
 +
 +                        if (!readStatic)
 +                        {
 +                            if (!partition.staticRow().isEmpty())
 +                                indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
 +                            indexers.forEach((Index.Indexer i) -> i.partitionDelete(partition.partitionLevelDeletion()));
 +                            readStatic = true;
 +                        }
 +
 +                        MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(partition.partitionLevelDeletion(), baseCfs.getComparator(), false);
 +
 +                        while (partition.hasNext())
 +                        {
 +                            Unfiltered unfilteredRow = partition.next();
 +
 +                            if (unfilteredRow.isRow())
 +                            {
 +                                Row row = (Row) unfilteredRow;
 +                                indexers.forEach(indexer -> indexer.insertRow(row));
 +                            }
 +                            else
 +                            {
 +                                assert unfilteredRow.isRangeTombstoneMarker();
 +                                RangeTombstoneMarker marker = (RangeTombstoneMarker) unfilteredRow;
 +                                deletionBuilder.add(marker);
 +                            }
 +                        }
 +
 +                        MutableDeletionInfo deletionInfo = deletionBuilder.build();
 +                        if (deletionInfo.hasRanges())
 +                        {
 +                            Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
 +                            while (iter.hasNext())
 +                            {
 +                                RangeTombstone rt = iter.next();
 +                                indexers.forEach(indexer -> indexer.rangeTombstone(rt));
 +                            }
 +                        }
 +
 +                        indexers.forEach(Index.Indexer::finish);
 +                    }
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Return the page size used when indexing an entire partition
 +     */
 +    public int calculateIndexingPageSize()
 +    {
 +        if (Boolean.getBoolean("cassandra.force_default_indexing_page_size"))
 +            return DEFAULT_PAGE_SIZE;
 +
 +        double targetPageSizeInBytes = 32 * 1024 * 1024;
 +        double meanPartitionSize = baseCfs.getMeanPartitionSize();
 +        if (meanPartitionSize <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        int meanCellsPerPartition = baseCfs.getMeanColumns();
 +        if (meanCellsPerPartition <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        int columnsPerRow = baseCfs.metadata.partitionColumns().regulars.size();
 +        if (columnsPerRow <= 0)
 +            return DEFAULT_PAGE_SIZE;
 +
 +        int meanRowsPerPartition = meanCellsPerPartition / columnsPerRow;
 +        double meanRowSize = meanPartitionSize / meanRowsPerPartition;
 +
 +        int pageSize = (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, targetPageSizeInBytes / meanRowSize));
 +
 +        logger.trace("Calculated page size {} for indexing {}.{} ({}/{}/{}/{})",
 +                     pageSize,
 +                     baseCfs.metadata.ksName,
 +                     baseCfs.metadata.cfName,
 +                     meanPartitionSize,
 +                     meanCellsPerPartition,
 +                     meanRowsPerPartition,
 +                     meanRowSize);
 +
 +        return pageSize;
 +    }
 +
 +    /**
 +     * Delete all data from all indexes for this partition.
 +     * For when cleanup rips a partition out entirely.
 +     *
 +     * TODO : improve cleanup transaction to batch updates & perform them async
 +     */
 +    public void deletePartition(UnfilteredRowIterator partition, int nowInSec)
 +    {
 +        // we need to acquire memtable lock because secondary index deletion may
 +        // cause a race (see CASSANDRA-3712). This is done internally by the
 +        // index transaction when it commits
 +        CleanupTransaction indexTransaction = newCleanupTransaction(partition.partitionKey(),
 +                                                                    partition.columns(),
 +                                                                    nowInSec);
 +        indexTransaction.start();
 +        indexTransaction.onPartitionDeletion(new DeletionTime(FBUtilities.timestampMicros(), nowInSec));
 +        indexTransaction.commit();
 +
 +        while (partition.hasNext())
 +        {
 +            Unfiltered unfiltered = partition.next();
 +            if (unfiltered.kind() != Unfiltered.Kind.ROW)
 +                continue;
 +
 +            indexTransaction = newCleanupTransaction(partition.partitionKey(),
 +                                                     partition.columns(),
 +                                                     nowInSec);
 +            indexTransaction.start();
 +            indexTransaction.onRowDelete((Row)unfiltered);
 +            indexTransaction.commit();
 +        }
 +    }
 +
 +    /**
 +     * Called at query time to choose which (if any) of the registered index implementations to use for a given query.
 +     *
 +     * This is a two step processes, firstly compiling the set of searchable indexes then choosing the one which reduces
 +     * the search space the most.
 +     *
 +     * In the first phase, if the command's RowFilter contains any custom index expressions, the indexes that they
 +     * specify are automatically included. Following that, the registered indexes are filtered to include only those
 +     * which support the standard expressions in the RowFilter.
 +     *
 +     * The filtered set then sorted by selectivity, as reported by the Index implementations' getEstimatedResultRows
 +     * method.
 +     *
 +     * Implementation specific validation of the target expression, either custom or standard, by the selected
 +     * index should be performed in the searcherFor method to ensure that we pick the right index regardless of
 +     * the validity of the expression.
 +     *
 +     * This method is only called once during the lifecycle of a ReadCommand and the result is
 +     * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
 +     * ReadOrderGroup, or an estimate of the result size from an average index query.
 +     *
 +     * @param rowFilter RowFilter of the command to be executed
 +     * @return an Index instance, ready to use during execution of the command, or null if none
 +     * of the registered indexes can support the command.
 +     */
 +    public Index getBestIndexFor(RowFilter rowFilter)
 +    {
 +        if (indexes.isEmpty() || rowFilter.isEmpty())
 +            return null;
 +
 +        Set<Index> searchableIndexes = new HashSet<>();
 +        for (RowFilter.Expression expression : rowFilter)
 +        {
 +            if (expression.isCustom())
 +            {
 +                // Only a single custom expression is allowed per query and, if present,
 +                // we want to always favour the index specified in such an expression
 +                RowFilter.CustomExpression customExpression = (RowFilter.CustomExpression)expression;
 +                logger.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
 +                Tracing.trace("Command contains a custom index expression, using target index {}", customExpression.getTargetIndex().name);
 +                return indexes.get(customExpression.getTargetIndex().name);
 +            }
 +            else
 +            {
 +                indexes.values().stream()
 +                       .filter(index -> index.supportsExpression(expression.column(), expression.operator()))
 +                       .forEach(searchableIndexes::add);
 +            }
 +        }
 +
 +        if (searchableIndexes.isEmpty())
 +        {
 +            logger.trace("No applicable indexes found");
 +            Tracing.trace("No applicable indexes found");
 +            return null;
 +        }
 +
 +        Index selected = searchableIndexes.size() == 1
 +                         ? Iterables.getOnlyElement(searchableIndexes)
 +                         : searchableIndexes.stream()
 +                                            .min((a, b) -> Longs.compare(a.getEstimatedResultRows(),
 +                                                                         b.getEstimatedResultRows()))
 +                                            .orElseThrow(() -> new AssertionError("Could not select most selective index"));
 +
 +        // pay for an additional threadlocal get() rather than build the strings unnecessarily
 +        if (Tracing.isTracing())
 +        {
 +            Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
 +                          searchableIndexes.stream().map(i -> i.getIndexMetadata().name + ':' + i.getEstimatedResultRows())
 +                                           .collect(Collectors.joining(",")),
 +                          selected.getIndexMetadata().name);
 +        }
 +        return selected;
 +    }
 +
 +    /**
 +     * Called at write time to ensure that values present in the update
 +     * are valid according to the rules of all registered indexes which
 +     * will process it. The partition key as well as the clustering and
 +     * cell values for each row in the update may be checked by index
 +     * implementations
 +     * @param update PartitionUpdate containing the values to be validated by registered Index implementations
 +     * @throws InvalidRequestException
 +     */
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        for (Index index : indexes.values())
 +            index.validate(update);
 +    }
 +
 +    /**
 +     * IndexRegistry methods
 +     */
 +    public void registerIndex(Index index)
 +    {
 +        String name = index.getIndexMetadata().name;
 +        indexes.put(name, index);
 +        logger.trace("Registered index {}", name);
 +    }
 +
 +    public void unregisterIndex(Index index)
 +    {
 +        unregisterIndex(index.getIndexMetadata().name);
 +    }
 +
 +    private Index unregisterIndex(String name)
 +    {
 +        Index removed = indexes.remove(name);
 +        builtIndexes.remove(name);
 +        logger.trace(removed == null ? "Index {} was not registered" : "Removed index {} from registry",
 +                     name);
 +        return removed;
 +    }
 +
 +    public Index getIndex(IndexMetadata metadata)
 +    {
 +        return indexes.get(metadata.name);
 +    }
 +
 +    public Collection<Index> listIndexes()
 +    {
 +        return ImmutableSet.copyOf(indexes.values());
 +    }
 +
 +    /**
 +     * Handling of index updates.
 +     * Implementations of the various IndexTransaction interfaces, for keeping indexes in sync with base data
 +     * during updates, compaction and cleanup. Plus factory methods for obtaining transaction instances.
 +     */
 +
 +    /**
 +     * Transaction for updates on the write path.
 +     */
 +    public UpdateTransaction newUpdateTransaction(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
 +    {
 +        if (!hasIndexes())
 +            return UpdateTransaction.NO_OP;
 +
 +        Index.Indexer[] indexers = indexes.values().stream()
 +                                          .map(i -> i.indexerFor(update.partitionKey(),
 +                                                                 update.columns(),
 +                                                                 nowInSec,
 +                                                                 opGroup,
 +                                                                 IndexTransaction.Type.UPDATE))
 +                                          .filter(Objects::nonNull)
 +                                          .toArray(Index.Indexer[]::new);
 +
 +        return indexers.length == 0 ? UpdateTransaction.NO_OP : new WriteTimeTransaction(indexers);
 +    }
 +
 +    /**
 +     * Transaction for use when merging rows during compaction
 +     */
 +    public CompactionTransaction newCompactionTransaction(DecoratedKey key,
 +                                                          PartitionColumns partitionColumns,
 +                                                          int versions,
 +                                                          int nowInSec)
 +    {
 +        // the check for whether there are any registered indexes is already done in CompactionIterator
 +        return new IndexGCTransaction(key, partitionColumns, versions, nowInSec, listIndexes());
 +    }
 +
 +    /**
 +     * Transaction for use when removing partitions during cleanup
 +     */
 +    public CleanupTransaction newCleanupTransaction(DecoratedKey key,
 +                                                    PartitionColumns partitionColumns,
 +                                                    int nowInSec)
 +    {
 +        if (!hasIndexes())
 +            return CleanupTransaction.NO_OP;
 +
 +        return new CleanupGCTransaction(key, partitionColumns, nowInSec, listIndexes());
 +    }
 +
 +    /**
 +     * A single use transaction for processing a partition update on the regular write path
 +     */
 +    private static final class WriteTimeTransaction implements UpdateTransaction
 +    {
 +        private final Index.Indexer[] indexers;
 +
 +        private WriteTimeTransaction(Index.Indexer...indexers)
 +        {
 +            // don't allow null indexers, if we don't need any use a NullUpdater object
 +            for (Index.Indexer indexer : indexers) assert indexer != null;
 +            this.indexers = indexers;
 +        }
 +
 +        public void start()
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.begin();
 +        }
 +
 +        public void onPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.partitionDelete(deletionTime);
 +        }
 +
 +        public void onRangeTombstone(RangeTombstone tombstone)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.rangeTombstone(tombstone);
 +        }
 +
 +        public void onInserted(Row row)
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.insertRow(row);
 +        }
 +
 +        public void onUpdated(Row existing, Row updated)
 +        {
 +            final Row.Builder toRemove = BTreeRow.sortedBuilder();
 +            toRemove.newRow(existing.clustering());
 +            toRemove.addPrimaryKeyLivenessInfo(existing.primaryKeyLivenessInfo());
 +            toRemove.addRowDeletion(existing.deletion());
 +            final Row.Builder toInsert = BTreeRow.sortedBuilder();
 +            toInsert.newRow(updated.clustering());
 +            toInsert.addPrimaryKeyLivenessInfo(updated.primaryKeyLivenessInfo());
 +            toInsert.addRowDeletion(updated.deletion());
 +            // diff listener collates the columns to be added & removed from the indexes
 +            RowDiffListener diffListener = new RowDiffListener()
 +            {
 +                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                {
 +                }
 +
 +                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                {
 +                }
 +
 +                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                {
 +                }
 +
 +                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                {
 +                    if (merged != null && !merged.equals(original))
 +                        toInsert.addCell(merged);
 +
 +                    if (merged == null || (original != null && shouldCleanupOldValue(original, merged)))
 +                        toRemove.addCell(original);
 +
 +                }
 +            };
 +            Rows.diff(diffListener, updated, existing);
 +            Row oldRow = toRemove.build();
 +            Row newRow = toInsert.build();
 +            for (Index.Indexer indexer : indexers)
 +                indexer.updateRow(oldRow, newRow);
 +        }
 +
 +        public void commit()
 +        {
 +            for (Index.Indexer indexer : indexers)
 +                indexer.finish();
 +        }
 +
 +        private boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
 +        {
 +            // If either the value or timestamp is different, then we
 +            // should delete from the index. If not, then we can infer that
 +            // at least one of the cells is an ExpiringColumn and that the
 +            // difference is in the expiry time. In this case, we don't want to
 +            // delete the old value from the index as the tombstone we insert
 +            // will just hide the inserted value.
 +            // Completely identical cells (including expiring columns with
 +            // identical ttl & localExpirationTime) will not get this far due
 +            // to the oldCell.equals(newCell) in StandardUpdater.update
 +            return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
 +        }
 +    }
 +
 +    /**
 +     * A single-use transaction for updating indexes for a single partition during compaction where the only
 +     * operation is to merge rows
 +     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
 +     * a single partition
 +     */
 +    private static final class IndexGCTransaction implements CompactionTransaction
 +    {
 +        private final DecoratedKey key;
 +        private final PartitionColumns columns;
 +        private final int versions;
 +        private final int nowInSec;
 +        private final Collection<Index> indexes;
 +
 +        private Row[] rows;
 +
 +        private IndexGCTransaction(DecoratedKey key,
 +                                   PartitionColumns columns,
 +                                   int versions,
 +                                   int nowInSec,
 +                                   Collection<Index> indexes)
 +        {
 +            this.key = key;
 +            this.columns = columns;
 +            this.versions = versions;
 +            this.indexes = indexes;
 +            this.nowInSec = nowInSec;
 +        }
 +
 +        public void start()
 +        {
 +            if (versions > 0)
 +                rows = new Row[versions];
 +        }
 +
 +        public void onRowMerge(Row merged, Row...versions)
 +        {
 +            // Diff listener constructs rows representing deltas between the merged and original versions
 +            // These delta rows are then passed to registered indexes for removal processing
 +            final Row.Builder[] builders = new Row.Builder[versions.length];
 +            RowDiffListener diffListener = new RowDiffListener()
 +            {
 +                public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                {
 +                    if (original != null && (merged == null || !merged.isLive(nowInSec)))
 +                        getBuilder(i, clustering).addPrimaryKeyLivenessInfo(original);
 +                }
 +
 +                public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                {
 +                }
 +
 +                public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                {
 +                }
 +
 +                public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                {
 +                    if (original != null && (merged == null || !merged.isLive(nowInSec)))
 +                        getBuilder(i, clustering).addCell(original);
 +                }
 +
 +                private Row.Builder getBuilder(int index, Clustering clustering)
 +                {
 +                    if (builders[index] == null)
 +                    {
 +                        builders[index] = BTreeRow.sortedBuilder();
 +                        builders[index].newRow(clustering);
 +                    }
 +                    return builders[index];
 +                }
 +            };
 +
 +            Rows.diff(diffListener, merged, versions);
 +
 +            for(int i = 0; i < builders.length; i++)
 +                if (builders[i] != null)
 +                    rows[i] = builders[i].build();
 +        }
 +
 +        public void commit()
 +        {
 +            if (rows == null)
 +                return;
 +
 +            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
 +            {
 +                for (Index index : indexes)
 +                {
 +                    Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, opGroup, Type.COMPACTION);
 +                    if (indexer == null)
 +                        continue;
 +
 +                    indexer.begin();
 +                    for (Row row : rows)
 +                        if (row != null)
 +                            indexer.removeRow(row);
 +                    indexer.finish();
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
 +     * A single-use transaction for updating indexes for a single partition during cleanup, where
 +     * partitions and rows are only removed
 +     * TODO : make this smarter at batching updates so we can use a single transaction to process multiple rows in
 +     * a single partition
 +     */
 +    private static final class CleanupGCTransaction implements CleanupTransaction
 +    {
 +        private final DecoratedKey key;
 +        private final PartitionColumns columns;
 +        private final int nowInSec;
 +        private final Collection<Index> indexes;
 +
 +        private Row row;
 +        private DeletionTime partitionDelete;
 +
 +        private CleanupGCTransaction(DecoratedKey key,
 +                                     PartitionColumns columns,
 +                                     int nowInSec,
 +                                     Collection<Index> indexes)
 +        {
 +            this.key = key;
 +            this.columns = columns;
 +            this.indexes = indexes;
 +            this.nowInSec = nowInSec;
 +        }
 +
 +        public void start()
 +        {
 +        }
 +
 +        public void onPartitionDeletion(DeletionTime deletionTime)
 +        {
 +            partitionDelete = deletionTime;
 +        }
 +
 +        public void onRowDelete(Row row)
 +        {
 +            this.row = row;
 +        }
 +
 +        public void commit()
 +        {
 +            if (row == null && partitionDelete == null)
 +                return;
 +
 +            try (OpOrder.Group opGroup = Keyspace.writeOrder.start())
 +            {
 +                for (Index index : indexes)
 +                {
 +                    Index.Indexer indexer = index.indexerFor(key, columns, nowInSec, opGroup, Type.CLEANUP);
 +                    if (indexer == null)
 +                        continue;
 +
 +                    indexer.begin();
 +
 +                    if (partitionDelete != null)
 +                        indexer.partitionDelete(partitionDelete);
 +
 +                    if (row != null)
 +                        indexer.removeRow(row);
 +
 +                    indexer.finish();
 +                }
 +            }
 +        }
 +    }
 +
 +    private static void executeBlocking(Callable<?> task)
 +    {
 +        if (null != task)
 +            FBUtilities.waitOnFuture(blockingExecutor.submit(task));
 +    }
 +
 +    private static void executeAllBlocking(Stream<Index> indexers, Function<Index, Callable<?>> function)
 +    {
 +        List<Future<?>> waitFor = new ArrayList<>();
 +        indexers.forEach(indexer -> {
 +            Callable<?> task = function.apply(indexer);
 +            if (null != task)
 +                waitFor.add(blockingExecutor.submit(task));
 +        });
 +        FBUtilities.waitOnFutures(waitFor);
 +    }
 +
 +    @VisibleForTesting
-     public static void shutdownExecutors() throws InterruptedException
++    public static void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 +    {
 +        ExecutorService[] executors = new ExecutorService[]{ asyncExecutor, blockingExecutor };
-         for (ExecutorService executor : executors)
-             executor.shutdown();
-         for (ExecutorService executor : executors)
-             executor.awaitTermination(60, TimeUnit.SECONDS);
++        shutdown(executors);
++        awaitTermination(timeout, unit, executors);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 507b6fa,9317132..ae79217
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@@ -41,9 -42,9 +42,10 @@@ import org.apache.cassandra.db.compacti
  import org.apache.cassandra.db.compaction.CompactionManager;
  import org.apache.cassandra.db.compaction.OperationType;
  import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
  import org.apache.cassandra.db.lifecycle.View;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.utils.ExecutorUtils;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MBeanWrapper;
  import org.apache.cassandra.utils.Pair;
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 82b26ea,e42b91b..b817c99
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -1155,22 -1061,15 +1155,24 @@@ public final class MessagingService imp
          }
      }
  
 -    private static void handleIOException(IOException e) throws IOException
 +    private static void handleIOExceptionOnClose(IOException e) throws IOException
      {
          // dirty hack for clean shutdown on OSX w/ Java >= 1.8.0_20
 -        // see https://bugs.openjdk.java.net/browse/JDK-8050499
 -        if ((!"Unknown error: 316".equals(e.getMessage()) || !"Mac OS X".equals(System.getProperty("os.name"))) &&
 -            !"Thread signal failed".equals(e.getMessage()) && // handle shutdown for in-JVM dtests
 -            !"Bad file descriptor".equals(e.getMessage()) &&
 -            !"No such file or directory".equals(e.getMessage()))
 -            throw e;
 +        // see https://bugs.openjdk.java.net/browse/JDK-8050499;
 +        // also CASSANDRA-12513
 +        if ("Mac OS X".equals(System.getProperty("os.name")))
 +        {
 +            switch (e.getMessage())
 +            {
 +                case "Unknown error: 316":
 +                case "No such file or directory":
++                case "Bad file descriptor":
++                case "Thread signal failed":
 +                    return;
 +            }
 +        }
 +
 +        throw e;
      }
  
      public Map<String, Integer> getLargeMessagePendingTasks()
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 4769b22,0a9a8da..d349b4b
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -92,10 -80,7 +93,11 @@@ import org.apache.cassandra.utils.progr
  import org.apache.cassandra.utils.progress.jmx.JMXProgressSupport;
  import org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport;
  
 +import static java.util.Arrays.asList;
+ import static java.util.concurrent.TimeUnit.MINUTES;
 +import static java.util.stream.Collectors.toList;
 +import static org.apache.cassandra.index.SecondaryIndexManager.getIndexName;
 +import static org.apache.cassandra.index.SecondaryIndexManager.isIndexColumnFamily;
  
  /**
   * This abstraction contains the token/identifier of this node
@@@ -651,12 -660,12 +653,17 @@@ public class StorageService extends Not
                  if (FBUtilities.isWindows())
                      WindowsTimer.endTimerPeriod(DatabaseDescriptor.getWindowsTimerInterval());
  
 +                // Cleanup logback
 +                DelayingShutdownHook logbackHook = new DelayingShutdownHook();
 +                logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
 +                logbackHook.run();
++
+                 // wait for miscellaneous tasks like sstable and commitlog segment deletion
+                 ScheduledExecutors.nonPeriodicTasks.shutdown();
+                 if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES))
+                     logger.warn("Miscellaneous task executor still busy after one minute; proceeding with shutdown");
              }
 -        }, "StorageServiceShutdownHook");
 +        }), "StorageServiceShutdownHook");
          Runtime.getRuntime().addShutdownHook(drainOnShutdown);
  
          replacing = DatabaseDescriptor.isReplacing();
@@@ -1403,9 -1368,9 +1410,9 @@@
          return bgMonitor.getSeverity(endpoint);
      }
  
-     public void shutdownBGMonitor()
 -    public void shutdownBGMonitorAndWait(long timeout, TimeUnit units) throws TimeoutException, InterruptedException
++    public void shutdownBGMonitorAndWait(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException
      {
-         bgMonitor.shutdown();
 -        bgMonitor.shutdownAndWait(timeout, units);
++        bgMonitor.shutdownAndWait(timeout, unit);
      }
  
      /**
@@@ -4211,149 -4020,96 +4218,158 @@@
       */
      public synchronized void drain() throws IOException, InterruptedException, ExecutionException
      {
 -        inShutdownHook = true;
 +        drain(false);
 +    }
  
 +    protected synchronized void drain(boolean isFinalShutdown) throws IOException, InterruptedException, ExecutionException
 +    {
          ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
 +        ExecutorService viewMutationStage = StageManager.getStage(Stage.VIEW_MUTATION);
          ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
 -        if (mutationStage.isTerminated() && counterMutationStage.isTerminated())
 +
 +        if (mutationStage.isTerminated()
 +            && counterMutationStage.isTerminated()
 +            && viewMutationStage.isTerminated())
          {
 -            logger.warn("Cannot drain node (did it already happen?)");
 +            if (!isFinalShutdown)
 +                logger.warn("Cannot drain node (did it already happen?)");
              return;
          }
 -        setMode(Mode.DRAINING, "starting drain process", true);
 -        shutdownClientServers();
 -        ScheduledExecutors.optionalTasks.shutdown();
 -        Gossiper.instance.stop();
  
 -        setMode(Mode.DRAINING, "shutting down MessageService", false);
 -        MessagingService.instance().shutdown();
 +        assert !isShutdown;
 +        isShutdown = true;
  
 -        setMode(Mode.DRAINING, "clearing mutation stage", false);
 -        counterMutationStage.shutdown();
 -        mutationStage.shutdown();
 -        counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 -        mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +        try
 +        {
 +            setMode(Mode.DRAINING, "starting drain process", !isFinalShutdown);
  
-             BatchlogManager.instance.shutdown();
 -        StorageProxy.instance.verifyNoHintsInProgress();
++            try
++            {
++                /* not clear this is reasonable time, but propagated from prior embedded behaviour */
++                BatchlogManager.instance.shutdownAndWait(1L, MINUTES);
++            }
++            catch (TimeoutException t)
++            {
++                logger.error("Batchlog manager timed out shutting down", t);
++            }
+ 
 -        setMode(Mode.DRAINING, "flushing column families", false);
 -        // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
 -        totalCFs = 0;
 -        for (Keyspace keyspace : Keyspace.nonSystem())
 -            totalCFs += keyspace.getColumnFamilyStores().size();
 -        remainingCFs = totalCFs;
 -        // flush
 -        List<Future<?>> flushes = new ArrayList<>();
 -        for (Keyspace keyspace : Keyspace.nonSystem())
 -        {
 -            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 -                flushes.add(cfs.forceFlush());
 -        }
 -        // wait for the flushes.
 -        // TODO this is a godawful way to track progress, since they flush in parallel.  a long one could
 -        // thus make several short ones "instant" if we wait for them later.
 -        for (Future f : flushes)
 -        {
 -            FBUtilities.waitOnFuture(f);
 -            remainingCFs--;
 -        }
 +            HintsService.instance.pauseDispatch();
  
 -        try
 -        {
 -            /* not clear this is reasonable time, but propagated from prior embedded behaviour */
 -            BatchlogManager.shutdownAndWait(1L, MINUTES);
 -        }
 -        catch (TimeoutException t)
 -        {
 -            logger.error("Batchlog manager timed out shutting down", t);
 -        }
 +            if (daemon != null)
 +                shutdownClientServers();
 +            ScheduledExecutors.optionalTasks.shutdown();
 +            Gossiper.instance.stop();
 +
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "shutting down MessageService", false);
 +
 +            // In-progress writes originating here could generate hints to be written, so shut down MessagingService
 +            // before mutation stage, so we can get all the hints saved before shutting down
 +            MessagingService.instance().shutdown();
 +
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "clearing mutation stage", false);
 +            viewMutationStage.shutdown();
 +            counterMutationStage.shutdown();
 +            mutationStage.shutdown();
 +            viewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +            counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +            mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 +
 +            StorageProxy.instance.verifyNoHintsInProgress();
 +
 +            if (!isFinalShutdown)
 +                setMode(Mode.DRAINING, "flushing column families", false);
 +
 +            // disable autocompaction - we don't want to start any new compactions while we are draining
 +            for (Keyspace keyspace : Keyspace.all())
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    cfs.disableAutoCompaction();
 +
 +            // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
 +            totalCFs = 0;
 +            for (Keyspace keyspace : Keyspace.nonSystem())
 +                totalCFs += keyspace.getColumnFamilyStores().size();
 +            remainingCFs = totalCFs;
 +            // flush
 +            List<Future<?>> flushes = new ArrayList<>();
 +            for (Keyspace keyspace : Keyspace.nonSystem())
 +            {
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    flushes.add(cfs.forceFlush());
 +            }
 +            // wait for the flushes.
 +            // TODO this is a godawful way to track progress, since they flush in parallel.  a long one could
 +            // thus make several short ones "instant" if we wait for them later.
 +            for (Future f : flushes)
 +            {
 +                try
 +                {
 +                    FBUtilities.waitOnFuture(f);
 +                }
 +                catch (Throwable t)
 +                {
 +                    JVMStabilityInspector.inspectThrowable(t);
 +                    // don't let this stop us from shutting down the commitlog and other thread pools
 +                    logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t);
 +                }
  
 -        // Interrupt on going compaction and shutdown to prevent further compaction
 -        CompactionManager.instance.forceShutdown();
 +                remainingCFs--;
 +            }
  
 -        // Flush the system tables after all other tables are flushed, just in case flushing modifies any system state
 -        // like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny.
 -        // Flush system tables after stopping the batchlog manager and compactions since they both modify
 -        // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update
 -        // system tables, see SSTableReader.GlobalTidy)
 -        flushes.clear();
 -        for (Keyspace keyspace : Keyspace.system())
 -        {
 -            for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 -                flushes.add(cfs.forceFlush());
 -        }
 -        FBUtilities.waitOnFutures(flushes);
 +            // Interrupt ongoing compactions and shutdown CM to prevent further compactions.
 +            CompactionManager.instance.forceShutdown();
 +            // Flush the system tables after all other tables are flushed, just in case flushing modifies any system state
 +            // like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny.
 +            // Flush system tables after stopping compactions since they modify
 +            // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update
 +            // system tables, see SSTableReader.GlobalTidy)
 +            flushes.clear();
 +            for (Keyspace keyspace : Keyspace.system())
 +            {
 +                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
 +                    flushes.add(cfs.forceFlush());
 +            }
 +            FBUtilities.waitOnFutures(flushes);
 +
 +            HintsService.instance.shutdownBlocking();
  
 -        // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
 -        // there are no segments to replay, so we force the recycling of any remaining (should be at most one)
 -        CommitLog.instance.forceRecycleAllSegments();
 +            // Interrupt ongoing compactions and shutdown CM to prevent further compactions.
 +            CompactionManager.instance.forceShutdown();
  
 -        CommitLog.instance.shutdownBlocking();
 +            // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
 +            // there are no segments to replay, so we force the recycling of any remaining (should be at most one)
 +            CommitLog.instance.forceRecycleAllSegments();
  
 -        // wait for miscellaneous tasks like sstable and commitlog segment deletion
 -        ScheduledExecutors.nonPeriodicTasks.shutdown();
 -        if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES))
 -            logger.warn("Failed to wait for non periodic tasks to shutdown");
 +            CommitLog.instance.shutdownBlocking();
  
 -        ColumnFamilyStore.shutdownPostFlushExecutor();
 +            // wait for miscellaneous tasks like sstable and commitlog segment deletion
 +            ScheduledExecutors.nonPeriodicTasks.shutdown();
-             if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
++            if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES))
 +                logger.warn("Failed to wait for non periodic tasks to shutdown");
  
 -        setMode(Mode.DRAINED, true);
 +            ColumnFamilyStore.shutdownPostFlushExecutor();
 +            setMode(Mode.DRAINED, !isFinalShutdown);
 +        }
 +        catch (Throwable t)
 +        {
 +            logger.error("Caught an exception while draining ", t);
 +        }
 +    }
 +
 +    /**
 +     * Some services are shutdown during draining and we should not attempt to start them again.
 +     *
 +     * @param service - the name of the service we are trying to start.
 +     * @throws IllegalStateException - an exception that nodetool is able to convert into a message to display to the user
 +     */
 +    synchronized void checkServiceAllowedToStart(String service)
 +    {
 +        if (isDraining()) // when draining isShutdown is also true, so we check first to return a more accurate message
 +            throw new IllegalStateException(String.format("Unable to start %s because the node is draining.", service));
 +
 +        if (isShutdown()) // do not rely on operationMode in case it gets changed to decomissioned or other
 +            throw new IllegalStateException(String.format("Unable to start %s because the node was drained.", service));
      }
  
      // Never ever do this at home. Used by tests.
diff --cc src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 153a5b3,c009032..933c498
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@@ -35,22 -35,9 +35,26 @@@ import org.apache.cassandra.concurrent.
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.concurrent.InfiniteLoopExecutor.InterruptibleRunnable;
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.Memory;
 +import org.apache.cassandra.io.util.SafeMemory;
+ import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +import org.apache.cassandra.utils.Pair;
 +import org.cliffc.high_scale_lib.NonBlockingHashMap;
  
 +import static java.util.Collections.emptyList;
++import org.apache.cassandra.concurrent.InfiniteLoopExecutor.InterruptibleRunnable;
 +
++import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
  import static org.apache.cassandra.utils.Throwables.maybeFail;
  import static org.apache.cassandra.utils.Throwables.merge;
  
@@@ -374,343 -343,9 +378,337 @@@ public final class Ref<T> implements Re
          }
      }
  
 +    static final Deque<InProgressVisit> inProgressVisitPool = new ArrayDeque<InProgressVisit>();
 +
 +    @SuppressWarnings({ "rawtypes", "unchecked" })
 +    static InProgressVisit newInProgressVisit(Object o, List<Field> fields, Field field, String name)
 +    {
 +        Preconditions.checkNotNull(o);
 +        InProgressVisit ipv = inProgressVisitPool.pollLast();
 +        if (ipv == null)
 +            ipv = new InProgressVisit();
 +
 +        ipv.o = o;
 +        if (o instanceof Object[])
 +            ipv.collectionIterator = Arrays.asList((Object[])o).iterator();
 +        else if (o instanceof ConcurrentMap)
 +        {
 +            ipv.isMapIterator = true;
 +            ipv.collectionIterator = ((Map)o).entrySet().iterator();
 +        }
 +        else if (concurrentIterables.contains(o.getClass()) | o instanceof BlockingQueue)
 +            ipv.collectionIterator = ((Iterable)o).iterator();
 +
 +        ipv.fields = fields;
 +        ipv.field = field;
 +        ipv.name = name;
 +        return ipv;
 +    }
 +
 +    static void returnInProgressVisit(InProgressVisit ipv)
 +    {
 +        if (inProgressVisitPool.size() > 1024)
 +            return;
 +        ipv.name = null;
 +        ipv.fields = null;
 +        ipv.o = null;
 +        ipv.fieldIndex = 0;
 +        ipv.field = null;
 +        ipv.collectionIterator = null;
 +        ipv.mapEntryValue = null;
 +        ipv.isMapIterator = false;
 +        inProgressVisitPool.offer(ipv);
 +    }
 +
 +    /*
 +     * Stack state for walking an object graph.
 +     * Field index is the index of the current field being fetched.
 +     */
 +    @SuppressWarnings({ "rawtypes"})
 +    static class InProgressVisit
 +    {
 +        String name;
 +        List<Field> fields;
 +        Object o;
 +        int fieldIndex = 0;
 +        Field field;
 +
 +        //Need to know if Map.Entry should be returned or traversed as an object
 +        boolean isMapIterator;
 +        //If o is a ConcurrentMap, BlockingQueue, or Object[], this is populated with an iterator over the contents
 +        Iterator<Object> collectionIterator;
 +        //If o is a ConcurrentMap the entry set contains keys and values. The key is returned as the first child
 +        //And the associated value is stashed here and returned next
 +        Object mapEntryValue;
 +
 +        private Field nextField()
 +        {
 +            if (fields.isEmpty())
 +                return null;
 +
 +            if (fieldIndex >= fields.size())
 +                return null;
 +
 +            Field retval = fields.get(fieldIndex);
 +            fieldIndex++;
 +            return retval;
 +        }
 +
 +        Pair<Object, Field> nextChild() throws IllegalAccessException
 +        {
 +            //If the last child returned was a key from a map, the value from that entry is stashed
 +            //so it can be returned next
 +            if (mapEntryValue != null)
 +            {
 +                Pair<Object, Field> retval = Pair.create(mapEntryValue, field);
 +                mapEntryValue = null;
 +                return retval;
 +            }
 +
 +            //If o is a ConcurrentMap, BlockingQueue, or Object[], then an iterator will be stored to return the elements
 +            if (collectionIterator != null)
 +            {
 +                if (!collectionIterator.hasNext())
 +                    return null;
 +                Object nextItem = null;
 +                //Find the next non-null element to traverse since returning null will cause the visitor to stop
 +                while (collectionIterator.hasNext() && (nextItem = collectionIterator.next()) == null){}
 +                if (nextItem != null)
 +                {
 +                    if (isMapIterator & nextItem instanceof Map.Entry)
 +                    {
 +                        Map.Entry entry = (Map.Entry)nextItem;
 +                        mapEntryValue = entry.getValue();
 +                        return Pair.create(entry.getKey(), field);
 +                    }
 +                    return Pair.create(nextItem, field);
 +                }
 +                else
 +                {
 +                    return null;
 +                }
 +            }
 +
 +            //Basic traversal of an object by its member fields
 +            //Don't return null values as that indicates no more objects
 +            while (true)
 +            {
 +                Field nextField = nextField();
 +                if (nextField == null)
 +                    return null;
 +
 +                //A weak reference isn't strongly reachable
 +                //subclasses of WeakReference contain strong references in their fields, so those need to be traversed
 +                //The weak reference fields are in the common Reference class base so filter those out
 +                if (o instanceof WeakReference & nextField.getDeclaringClass() == Reference.class)
 +                    continue;
 +
 +                Object nextObject = nextField.get(o);
 +                if (nextObject != null)
 +                    return Pair.create(nextField.get(o), nextField);
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return field == null ? name : field.toString() + "-" + o.getClass().getName();
 +        }
 +    }
 +
 +    static class Visitor implements Runnable
 +    {
 +        final Deque<InProgressVisit> path = new ArrayDeque<>();
 +        final Set<Object> visited = Collections.newSetFromMap(new IdentityHashMap<>());
 +        @VisibleForTesting
 +        int lastVisitedCount;
 +        @VisibleForTesting
 +        long iterations = 0;
 +        GlobalState visiting;
 +        Set<GlobalState> haveLoops;
 +
 +        public void run()
 +        {
 +            try
 +            {
 +                for (GlobalState globalState : globallyExtant)
 +                {
 +                    if (globalState.tidy == null)
 +                        continue;
 +
 +                    // do a graph exploration of the GlobalState, since it should be shallow; if it references itself, we have a problem
 +                    path.clear();
 +                    visited.clear();
 +                    lastVisitedCount = 0;
 +                    iterations = 0;
 +                    visited.add(globalState);
 +                    visiting = globalState;
 +                    traverse(globalState.tidy);
 +                }
 +            }
 +            catch (Throwable t)
 +            {
 +                t.printStackTrace();
 +            }
 +            finally
 +            {
 +                lastVisitedCount = visited.size();
 +                path.clear();
 +                visited.clear();
 +            }
 +        }
 +
 +        /*
 +         * Searches for an indirect strong reference between rootObject and visiting.
 +         */
 +        void traverse(final RefCounted.Tidy rootObject)
 +        {
 +            path.offer(newInProgressVisit(rootObject, getFields(rootObject.getClass()), null, rootObject.name()));
 +
 +            InProgressVisit inProgress = null;
 +            while (inProgress != null || !path.isEmpty())
 +            {
 +                //If necessary fetch the next object to start tracing
 +                if (inProgress == null)
 +                    inProgress = path.pollLast();
 +
 +                try
 +                {
 +                    Pair<Object, Field> p = inProgress.nextChild();
 +                    Object child = null;
 +                    Field field = null;
 +
 +                    if (p != null)
 +                    {
 +                        iterations++;
 +                        child = p.left;
 +                        field = p.right;
 +                    }
 +
 +                    if (child != null && visited.add(child))
 +                    {
 +                        path.offer(inProgress);
 +                        inProgress = newInProgressVisit(child, getFields(child.getClass()), field, null);
 +                        continue;
 +                    }
 +                    else if (visiting == child)
 +                    {
 +                        if (haveLoops != null)
 +                            haveLoops.add(visiting);
 +                        NoSpamLogger.log(logger,
 +                                NoSpamLogger.Level.ERROR,
 +                                rootObject.getClass().getName(),
 +                                1,
 +                                TimeUnit.SECONDS,
 +                                "Strong self-ref loop detected {}",
 +                                path);
 +                    }
 +                    else if (child == null)
 +                    {
 +                        returnInProgressVisit(inProgress);
 +                        inProgress = null;
 +                        continue;
 +                    }
 +                }
 +                catch (IllegalAccessException e)
 +                {
 +                    NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, 5, TimeUnit.MINUTES, "Could not fully check for self-referential leaks", e);
 +                }
 +            }
 +        }
 +    }
 +
 +    static final Map<Class<?>, List<Field>> fieldMap = new HashMap<>();
 +    static List<Field> getFields(Class<?> clazz)
 +    {
 +        if (clazz == null || clazz == PhantomReference.class || clazz == Class.class || java.lang.reflect.Member.class.isAssignableFrom(clazz))
 +            return emptyList();
 +        List<Field> fields = fieldMap.get(clazz);
 +        if (fields != null)
 +            return fields;
 +        fieldMap.put(clazz, fields = new ArrayList<>());
 +        for (Field field : clazz.getDeclaredFields())
 +        {
 +            if (field.getType().isPrimitive() || Modifier.isStatic(field.getModifiers()))
 +                continue;
 +            field.setAccessible(true);
 +            fields.add(field);
 +        }
 +        fields.addAll(getFields(clazz.getSuperclass()));
 +        return fields;
 +    }
 +
 +    public static class IdentityCollection
 +    {
 +        final Set<Tidy> candidates;
 +        public IdentityCollection(Set<Tidy> candidates)
 +        {
 +            this.candidates = candidates;
 +        }
 +
 +        public void add(Ref<?> ref)
 +        {
 +            candidates.remove(ref.state.globalState.tidy);
 +        }
 +        public void add(SelfRefCounted<?> ref)
 +        {
 +            add(ref.selfRef());
 +        }
 +        public void add(SharedCloseable ref)
 +        {
 +            if (ref instanceof SharedCloseableImpl)
 +                add((SharedCloseableImpl)ref);
 +        }
 +        public void add(SharedCloseableImpl ref)
 +        {
 +            add(ref.ref);
 +        }
 +        public void add(Memory memory)
 +        {
 +            if (memory instanceof SafeMemory)
 +                ((SafeMemory) memory).addTo(this);
 +        }
 +    }
 +
 +    private static class StrongLeakDetector implements Runnable
 +    {
 +        Set<Tidy> candidates = new HashSet<>();
 +
 +        public void run()
 +        {
 +            final Set<Tidy> candidates = Collections.newSetFromMap(new IdentityHashMap<>());
 +            for (GlobalState state : globallyExtant)
 +                candidates.add(state.tidy);
 +            removeExpected(candidates);
 +            this.candidates.retainAll(candidates);
 +            if (!this.candidates.isEmpty())
 +            {
 +                List<String> names = new ArrayList<>();
 +                for (Tidy tidy : this.candidates)
 +                    names.add(tidy.name());
 +                logger.warn("Strong reference leak candidates detected: {}", names);
 +            }
 +            this.candidates = candidates;
 +        }
 +
 +        private void removeExpected(Set<Tidy> candidates)
 +        {
 +            final Ref.IdentityCollection expected = new Ref.IdentityCollection(candidates);
 +            for (Keyspace ks : Keyspace.all())
 +            {
 +                for (ColumnFamilyStore cfs : ks.getColumnFamilyStores())
 +                {
 +                    View view = cfs.getTracker().getView();
 +                    for (SSTableReader reader : view.allKnownSSTables())
 +                        reader.addTo(expected);
 +                }
 +            }
 +        }
 +    }
 +
      @VisibleForTesting
-     public static void shutdownReferenceReaper() throws InterruptedException
+     public static void shutdownReferenceReaper(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         EXEC.shutdown();
-         EXEC.awaitTermination(60, TimeUnit.SECONDS);
-         if (STRONG_LEAK_DETECTOR != null)
-         {
-             STRONG_LEAK_DETECTOR.shutdownNow();
-             STRONG_LEAK_DETECTOR.awaitTermination(60, TimeUnit.SECONDS);
-         }
 -        ExecutorUtils.shutdownNowAndWait(timeout, unit, EXEC);
++        ExecutorUtils.shutdownNowAndWait(timeout, unit, EXEC, STRONG_LEAK_DETECTOR);
      }
  }
diff --cc src/java/org/apache/cassandra/utils/memory/BufferPool.java
index 339228c,0000000..d0cea0f
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@@ -1,847 -1,0 +1,851 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.utils.memory;
 +
 +import java.lang.ref.PhantomReference;
 +import java.lang.ref.ReferenceQueue;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 +
 +import org.apache.cassandra.io.compress.BufferType;
 +import org.apache.cassandra.io.util.FileUtils;
++import org.apache.cassandra.utils.ExecutorUtils;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.metrics.BufferPoolMetrics;
 +import org.apache.cassandra.utils.concurrent.Ref;
 +
++import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
++import static org.apache.cassandra.utils.ExecutorUtils.shutdownNow;
++
 +/**
 + * A pool of ByteBuffers that can be recycled.
 + */
 +public class BufferPool
 +{
 +    /** The size of a page aligned buffer, 64KiB */
 +    static final int CHUNK_SIZE = 64 << 10;
 +
 +    @VisibleForTesting
 +    public static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L;
 +
 +    @VisibleForTesting
 +    public static boolean ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = DatabaseDescriptor.getBufferPoolUseHeapIfExhausted();
 +
 +    @VisibleForTesting
 +    public static boolean DISABLED = Boolean.parseBoolean(System.getProperty("cassandra.test.disable_buffer_pool", "false"));
 +
 +    @VisibleForTesting
 +    public static boolean DEBUG = false;
 +
 +    private static final Logger logger = LoggerFactory.getLogger(BufferPool.class);
 +    private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
 +    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
 +
 +    /** A global pool of chunks (page aligned buffers) */
 +    private static final GlobalPool globalPool = new GlobalPool();
 +
 +    /** A thread local pool of chunks, where chunks come from the global pool */
 +    private static final ThreadLocal<LocalPool> localPool = new ThreadLocal<LocalPool>() {
 +        @Override
 +        protected LocalPool initialValue()
 +        {
 +            return new LocalPool();
 +        }
 +    };
 +
 +    public static ByteBuffer get(int size)
 +    {
 +        if (DISABLED)
 +            return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
 +        else
 +            return takeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
 +    }
 +
 +    public static ByteBuffer get(int size, BufferType bufferType)
 +    {
 +        boolean direct = bufferType == BufferType.OFF_HEAP;
 +        if (DISABLED || !direct)
 +            return allocate(size, !direct);
 +        else
 +            return takeFromPool(size, !direct);
 +    }
 +
 +    /** Unlike the get methods, this will return null if the pool is exhausted */
 +    public static ByteBuffer tryGet(int size)
 +    {
 +        if (DISABLED)
 +            return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
 +        else
 +            return maybeTakeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
 +    }
 +
 +    private static ByteBuffer allocate(int size, boolean onHeap)
 +    {
 +        return onHeap
 +               ? ByteBuffer.allocate(size)
 +               : ByteBuffer.allocateDirect(size);
 +    }
 +
 +    private static ByteBuffer takeFromPool(int size, boolean allocateOnHeapWhenExhausted)
 +    {
 +        ByteBuffer ret = maybeTakeFromPool(size, allocateOnHeapWhenExhausted);
 +        if (ret != null)
 +            return ret;
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Requested buffer size {} has been allocated directly due to lack of capacity", size);
 +
 +        return localPool.get().allocate(size, allocateOnHeapWhenExhausted);
 +    }
 +
 +    private static ByteBuffer maybeTakeFromPool(int size, boolean allocateOnHeapWhenExhausted)
 +    {
 +        if (size < 0)
 +            throw new IllegalArgumentException("Size must be positive (" + size + ")");
 +
 +        if (size == 0)
 +            return EMPTY_BUFFER;
 +
 +        if (size > CHUNK_SIZE)
 +        {
 +            if (logger.isTraceEnabled())
 +                logger.trace("Requested buffer size {} is bigger than {}, allocating directly", size, CHUNK_SIZE);
 +
 +            return localPool.get().allocate(size, allocateOnHeapWhenExhausted);
 +        }
 +
 +        return localPool.get().get(size);
 +    }
 +
 +    public static void put(ByteBuffer buffer)
 +    {
 +        if (!(DISABLED || buffer.hasArray()))
 +            localPool.get().put(buffer);
 +    }
 +
 +    /** This is not thread safe and should only be used for unit testing. */
 +    @VisibleForTesting
 +    static void reset()
 +    {
 +        localPool.get().reset();
 +        globalPool.reset();
 +    }
 +
 +    @VisibleForTesting
 +    static Chunk currentChunk()
 +    {
 +        return localPool.get().chunks[0];
 +    }
 +
 +    @VisibleForTesting
 +    static int numChunks()
 +    {
 +        int ret = 0;
 +        for (Chunk chunk : localPool.get().chunks)
 +        {
 +            if (chunk != null)
 +                ret++;
 +        }
 +        return ret;
 +    }
 +
 +    @VisibleForTesting
 +    static void assertAllRecycled()
 +    {
 +        globalPool.debug.check();
 +    }
 +
 +    public static long sizeInBytes()
 +    {
 +        return globalPool.sizeInBytes();
 +    }
 +
 +    static final class Debug
 +    {
 +        long recycleRound = 1;
 +        final Queue<Chunk> allChunks = new ConcurrentLinkedQueue<>();
 +        void register(Chunk chunk)
 +        {
 +            allChunks.add(chunk);
 +        }
 +        void recycle(Chunk chunk)
 +        {
 +            chunk.lastRecycled = recycleRound;
 +        }
 +        void check()
 +        {
 +            for (Chunk chunk : allChunks)
 +                assert chunk.lastRecycled == recycleRound;
 +            recycleRound++;
 +        }
 +    }
 +
 +    /**
 +     * A queue of page aligned buffers, the chunks, which have been sliced from bigger chunks,
 +     * the macro-chunks, also page aligned. Macro-chunks are allocated as long as we have not exceeded the
 +     * memory maximum threshold, MEMORY_USAGE_THRESHOLD and are never released.
 +     *
 +     * This class is shared by multiple thread local pools and must be thread-safe.
 +     */
 +    static final class GlobalPool
 +    {
 +        /** The size of a bigger chunk, 1-mbit, must be a multiple of CHUNK_SIZE */
 +        static final int MACRO_CHUNK_SIZE = 1 << 20;
 +
 +        static
 +        {
 +            assert Integer.bitCount(CHUNK_SIZE) == 1; // must be a power of 2
 +            assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2
 +            assert MACRO_CHUNK_SIZE % CHUNK_SIZE == 0; // must be a multiple
 +
 +            if (DISABLED)
 +                logger.info("Global buffer pool is disabled, allocating {}", ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
 +            else
 +                logger.info("Global buffer pool is enabled, when pool is exahusted (max is {} mb) it will allocate {}",
 +                            MEMORY_USAGE_THRESHOLD / (1024L * 1024L),
 +                            ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
 +        }
 +
 +        private final Debug debug = new Debug();
 +        private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>();
 +        // TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage
 +        private final Queue<Chunk> chunks = new ConcurrentLinkedQueue<>();
 +        private final AtomicLong memoryUsage = new AtomicLong();
 +
 +        /** Return a chunk, the caller will take owership of the parent chunk. */
 +        public Chunk get()
 +        {
 +            while (true)
 +            {
 +                Chunk chunk = chunks.poll();
 +                if (chunk != null)
 +                    return chunk;
 +
 +                if (!allocateMoreChunks())
 +                    // give it one last attempt, in case someone else allocated before us
 +                    return chunks.poll();
 +            }
 +        }
 +
 +        /**
 +         * This method might be called by multiple threads and that's fine if we add more
 +         * than one chunk at the same time as long as we don't exceed the MEMORY_USAGE_THRESHOLD.
 +         */
 +        private boolean allocateMoreChunks()
 +        {
 +            while (true)
 +            {
 +                long cur = memoryUsage.get();
 +                if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD)
 +                {
 +                    noSpamLogger.info("Maximum memory usage reached ({} bytes), cannot allocate chunk of {} bytes",
 +                                      MEMORY_USAGE_THRESHOLD, MACRO_CHUNK_SIZE);
 +                    return false;
 +                }
 +                if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE))
 +                    break;
 +            }
 +
 +            // allocate a large chunk
 +            Chunk chunk = new Chunk(allocateDirectAligned(MACRO_CHUNK_SIZE));
 +            chunk.acquire(null);
 +            macroChunks.add(chunk);
 +            for (int i = 0 ; i < MACRO_CHUNK_SIZE ; i += CHUNK_SIZE)
 +            {
 +                Chunk add = new Chunk(chunk.get(CHUNK_SIZE));
 +                chunks.add(add);
 +                if (DEBUG)
 +                    debug.register(add);
 +            }
 +
 +            return true;
 +        }
 +
 +        public void recycle(Chunk chunk)
 +        {
 +            chunks.add(chunk);
 +        }
 +
 +        public long sizeInBytes()
 +        {
 +            return memoryUsage.get();
 +        }
 +
 +        /** This is not thread safe and should only be used for unit testing. */
 +        @VisibleForTesting
 +        void reset()
 +        {
 +            while (!chunks.isEmpty())
 +                chunks.poll().reset();
 +
 +            while (!macroChunks.isEmpty())
 +                macroChunks.poll().reset();
 +
 +            memoryUsage.set(0);
 +        }
 +    }
 +
 +    /**
 +     * A thread local class that grabs chunks from the global pool for this thread allocations.
 +     * Only one thread can do the allocations but multiple threads can release the allocations.
 +     */
 +    static final class LocalPool
 +    {
 +        private final static BufferPoolMetrics metrics = new BufferPoolMetrics();
 +        // a microqueue of Chunks:
 +        //  * if any are null, they are at the end;
 +        //  * new Chunks are added to the last null index
 +        //  * if no null indexes available, the smallest is swapped with the last index, and this replaced
 +        //  * this results in a queue that will typically be visited in ascending order of available space, so that
 +        //    small allocations preferentially slice from the Chunks with the smallest space available to furnish them
 +        // WARNING: if we ever change the size of this, we must update removeFromLocalQueue, and addChunk
 +        private final Chunk[] chunks = new Chunk[3];
 +        private byte chunkCount = 0;
 +
 +        public LocalPool()
 +        {
 +            localPoolReferences.add(new LocalPoolRef(this, localPoolRefQueue));
 +        }
 +
 +        private Chunk addChunkFromGlobalPool()
 +        {
 +            Chunk chunk = globalPool.get();
 +            if (chunk == null)
 +                return null;
 +
 +            addChunk(chunk);
 +            return chunk;
 +        }
 +
 +        private void addChunk(Chunk chunk)
 +        {
 +            chunk.acquire(this);
 +
 +            if (chunkCount < 3)
 +            {
 +                chunks[chunkCount++] = chunk;
 +                return;
 +            }
 +
 +            int smallestChunkIdx = 0;
 +            if (chunks[1].free() < chunks[0].free())
 +                smallestChunkIdx = 1;
 +            if (chunks[2].free() < chunks[smallestChunkIdx].free())
 +                smallestChunkIdx = 2;
 +
 +            chunks[smallestChunkIdx].release();
 +            if (smallestChunkIdx != 2)
 +                chunks[smallestChunkIdx] = chunks[2];
 +            chunks[2] = chunk;
 +        }
 +
 +        public ByteBuffer get(int size)
 +        {
 +            for (Chunk chunk : chunks)
 +            { // first see if our own chunks can serve this buffer
 +                if (chunk == null)
 +                    break;
 +
 +                ByteBuffer buffer = chunk.get(size);
 +                if (buffer != null)
 +                    return buffer;
 +            }
 +
 +            // else ask the global pool
 +            Chunk chunk = addChunkFromGlobalPool();
 +            if (chunk != null)
 +                return chunk.get(size);
 +
 +           return null;
 +        }
 +
 +        private ByteBuffer allocate(int size, boolean onHeap)
 +        {
 +            metrics.misses.mark();
 +            return BufferPool.allocate(size, onHeap);
 +        }
 +
 +        public void put(ByteBuffer buffer)
 +        {
 +            Chunk chunk = Chunk.getParentChunk(buffer);
 +            if (chunk == null)
 +            {
 +                FileUtils.clean(buffer);
 +                return;
 +            }
 +
 +            LocalPool owner = chunk.owner;
 +            // ask the free method to take exclusive ownership of the act of recycling
 +            // if we are either: already not owned by anyone, or owned by ourselves
 +            long free = chunk.free(buffer, owner == null | owner == this);
 +            if (free == 0L)
 +            {
 +                // 0L => we own recycling responsibility, so must recycle;
 +                chunk.recycle();
 +                // if we are also the owner, we must remove the Chunk from our local queue
 +                if (owner == this)
 +                    removeFromLocalQueue(chunk);
 +            }
 +            else if (((free == -1L) && owner != this) && chunk.owner == null)
 +            {
 +                // although we try to take recycle ownership cheaply, it is not always possible to do so if the owner is racing to unset.
 +                // we must also check after completely freeing if the owner has since been unset, and try to recycle
 +                chunk.tryRecycle();
 +            }
 +        }
 +
 +        private void removeFromLocalQueue(Chunk chunk)
 +        {
 +            // since we only have three elements in the queue, it is clearer, easier and faster to just hard code the options
 +            if (chunks[0] == chunk)
 +            {   // remove first by shifting back second two
 +                chunks[0] = chunks[1];
 +                chunks[1] = chunks[2];
 +            }
 +            else if (chunks[1] == chunk)
 +            {   // remove second by shifting back last
 +                chunks[1] = chunks[2];
 +            }
 +            else assert chunks[2] == chunk;
 +            // whatever we do, the last element myst be null
 +            chunks[2] = null;
 +            chunkCount--;
 +        }
 +
 +        @VisibleForTesting
 +        void reset()
 +        {
 +            chunkCount = 0;
 +            for (int i = 0; i < chunks.length; i++)
 +            {
 +                if (chunks[i] != null)
 +                {
 +                    chunks[i].owner = null;
 +                    chunks[i].freeSlots = 0L;
 +                    chunks[i].recycle();
 +                    chunks[i] = null;
 +                }
 +            }
 +        }
 +    }
 +
 +    private static final class LocalPoolRef extends  PhantomReference<LocalPool>
 +    {
 +        private final Chunk[] chunks;
 +        public LocalPoolRef(LocalPool localPool, ReferenceQueue<? super LocalPool> q)
 +        {
 +            super(localPool, q);
 +            chunks = localPool.chunks;
 +        }
 +
 +        public void release()
 +        {
 +            for (int i = 0 ; i < chunks.length ; i++)
 +            {
 +                if (chunks[i] != null)
 +                {
 +                    chunks[i].release();
 +                    chunks[i] = null;
 +                }
 +            }
 +        }
 +    }
 +
 +    private static final ConcurrentLinkedQueue<LocalPoolRef> localPoolReferences = new ConcurrentLinkedQueue<>();
 +
 +    private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>();
 +    private static final InfiniteLoopExecutor EXEC = new InfiniteLoopExecutor("LocalPool-Cleaner", BufferPool::cleanupOneReference).start();
 +
 +    private static void cleanupOneReference() throws InterruptedException
 +    {
 +        Object obj = localPoolRefQueue.remove(100);
 +        if (obj instanceof LocalPoolRef)
 +        {
 +            ((LocalPoolRef) obj).release();
 +            localPoolReferences.remove(obj);
 +        }
 +    }
 +
 +    private static ByteBuffer allocateDirectAligned(int capacity)
 +    {
 +        int align = MemoryUtil.pageSize();
 +        if (Integer.bitCount(align) != 1)
 +            throw new IllegalArgumentException("Alignment must be a power of 2");
 +
 +        ByteBuffer buffer = ByteBuffer.allocateDirect(capacity + align);
 +        long address = MemoryUtil.getAddress(buffer);
 +        long offset = address & (align -1); // (address % align)
 +
 +        if (offset == 0)
 +        { // already aligned
 +            buffer.limit(capacity);
 +        }
 +        else
 +        { // shift by offset
 +            int pos = (int)(align - offset);
 +            buffer.position(pos);
 +            buffer.limit(pos + capacity);
 +        }
 +
 +        return buffer.slice();
 +    }
 +
 +    /**
 +     * A memory chunk: it takes a buffer (the slab) and slices it
 +     * into smaller buffers when requested.
 +     *
 +     * It divides the slab into 64 units and keeps a long mask, freeSlots,
 +     * indicating if a unit is in use or not. Each bit in freeSlots corresponds
 +     * to a unit, if the bit is set then the unit is free (available for allocation)
 +     * whilst if it is not set then the unit is in use.
 +     *
 +     * When we receive a request of a given size we round up the size to the nearest
 +     * multiple of allocation units required. Then we search for n consecutive free units,
 +     * where n is the number of units required. We also align to page boundaries.
 +     *
 +     * When we reiceve a release request we work out the position by comparing the buffer
 +     * address to our base address and we simply release the units.
 +     */
 +    final static class Chunk
 +    {
 +        private final ByteBuffer slab;
 +        private final long baseAddress;
 +        private final int shift;
 +
 +        private volatile long freeSlots;
 +        private static final AtomicLongFieldUpdater<Chunk> freeSlotsUpdater = AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots");
 +
 +        // the pool that is _currently allocating_ from this Chunk
 +        // if this is set, it means the chunk may not be recycled because we may still allocate from it;
 +        // if it has been unset the local pool has finished with it, and it may be recycled
 +        private volatile LocalPool owner;
 +        private long lastRecycled;
 +        private final Chunk original;
 +
 +        Chunk(Chunk recycle)
 +        {
 +            assert recycle.freeSlots == 0L;
 +            this.slab = recycle.slab;
 +            this.baseAddress = recycle.baseAddress;
 +            this.shift = recycle.shift;
 +            this.freeSlots = -1L;
 +            this.original = recycle.original;
 +            if (DEBUG)
 +                globalPool.debug.recycle(original);
 +        }
 +
 +        Chunk(ByteBuffer slab)
 +        {
 +            assert !slab.hasArray();
 +            this.slab = slab;
 +            this.baseAddress = MemoryUtil.getAddress(slab);
 +
 +            // The number of bits by which we need to shift to obtain a unit
 +            // "31 &" is because numberOfTrailingZeros returns 32 when the capacity is zero
 +            this.shift = 31 & (Integer.numberOfTrailingZeros(slab.capacity() / 64));
 +            // -1 means all free whilst 0 means all in use
 +            this.freeSlots = slab.capacity() == 0 ? 0L : -1L;
 +            this.original = DEBUG ? this : null;
 +        }
 +
 +        /**
 +         * Acquire the chunk for future allocations: set the owner and prep
 +         * the free slots mask.
 +         */
 +        void acquire(LocalPool owner)
 +        {
 +            assert this.owner == null;
 +            this.owner = owner;
 +        }
 +
 +        /**
 +         * Set the owner to null and return the chunk to the global pool if the chunk is fully free.
 +         * This method must be called by the LocalPool when it is certain that
 +         * the local pool shall never try to allocate any more buffers from this chunk.
 +         */
 +        void release()
 +        {
 +            this.owner = null;
 +            tryRecycle();
 +        }
 +
 +        void tryRecycle()
 +        {
 +            assert owner == null;
 +            if (isFree() && freeSlotsUpdater.compareAndSet(this, -1L, 0L))
 +                recycle();
 +        }
 +
 +        void recycle()
 +        {
 +            assert freeSlots == 0L;
 +            globalPool.recycle(new Chunk(this));
 +        }
 +
 +        /**
 +         * We stash the chunk in the attachment of a buffer
 +         * that was returned by get(), this method simply
 +         * retrives the chunk that sliced a buffer, if any.
 +         */
 +        static Chunk getParentChunk(ByteBuffer buffer)
 +        {
 +            Object attachment = MemoryUtil.getAttachment(buffer);
 +
 +            if (attachment instanceof Chunk)
 +                return (Chunk) attachment;
 +
 +            if (attachment instanceof Ref)
 +                return ((Ref<Chunk>) attachment).get();
 +
 +            return null;
 +        }
 +
 +        ByteBuffer setAttachment(ByteBuffer buffer)
 +        {
 +            if (Ref.DEBUG_ENABLED)
 +                MemoryUtil.setAttachment(buffer, new Ref<>(this, null));
 +            else
 +                MemoryUtil.setAttachment(buffer, this);
 +
 +            return buffer;
 +        }
 +
 +        boolean releaseAttachment(ByteBuffer buffer)
 +        {
 +            Object attachment = MemoryUtil.getAttachment(buffer);
 +            if (attachment == null)
 +                return false;
 +
 +            if (attachment instanceof Ref)
 +                ((Ref<Chunk>) attachment).release();
 +
 +            return true;
 +        }
 +
 +        @VisibleForTesting
 +        void reset()
 +        {
 +            Chunk parent = getParentChunk(slab);
 +            if (parent != null)
 +                parent.free(slab, false);
 +            else
 +                FileUtils.clean(slab);
 +        }
 +
 +        @VisibleForTesting
 +        long setFreeSlots(long val)
 +        {
 +            long ret = freeSlots;
 +            freeSlots = val;
 +            return ret;
 +        }
 +
 +        int capacity()
 +        {
 +            return 64 << shift;
 +        }
 +
 +        final int unit()
 +        {
 +            return 1 << shift;
 +        }
 +
 +        final boolean isFree()
 +        {
 +            return freeSlots == -1L;
 +        }
 +
 +        /** The total free size */
 +        int free()
 +        {
 +            return Long.bitCount(freeSlots) * unit();
 +        }
 +
 +        /**
 +         * Return the next available slice of this size. If
 +         * we have exceeded the capacity we return null.
 +         */
 +        ByteBuffer get(int size)
 +        {
 +            // how many multiples of our units is the size?
 +            // we add (unit - 1), so that when we divide by unit (>>> shift), we effectively round up
 +            int slotCount = (size - 1 + unit()) >>> shift;
 +
 +            // if we require more than 64 slots, we cannot possibly accommodate the allocation
 +            if (slotCount > 64)
 +                return null;
 +
 +            // convert the slotCount into the bits needed in the bitmap, but at the bottom of the register
 +            long slotBits = -1L >>> (64 - slotCount);
 +
 +            // in order that we always allocate page aligned results, we require that any allocation is "somewhat" aligned
 +            // i.e. any single unit allocation can go anywhere; any 2 unit allocation must begin in one of the first 3 slots
 +            // of a page; a 3 unit must go in the first two slots; and any four unit allocation must be fully page-aligned
 +
 +            // to achieve this, we construct a searchMask that constrains the bits we find to those we permit starting
 +            // a match from. as we find bits, we remove them from the mask to continue our search.
 +            // this has an odd property when it comes to concurrent alloc/free, as we can safely skip backwards if
 +            // a new slot is freed up, but we always make forward progress (i.e. never check the same bits twice),
 +            // so running time is bounded
 +            long searchMask = 0x1111111111111111L;
 +            searchMask *= 15L >>> ((slotCount - 1) & 3);
 +            // i.e. switch (slotCount & 3)
 +            // case 1: searchMask = 0xFFFFFFFFFFFFFFFFL
 +            // case 2: searchMask = 0x7777777777777777L
 +            // case 3: searchMask = 0x3333333333333333L
 +            // case 0: searchMask = 0x1111111111111111L
 +
 +            // truncate the mask, removing bits that have too few slots proceeding them
 +            searchMask &= -1L >>> (slotCount - 1);
 +
 +            // this loop is very unroll friendly, and would achieve high ILP, but not clear if the compiler will exploit this.
 +            // right now, not worth manually exploiting, but worth noting for future
 +            while (true)
 +            {
 +                long cur = freeSlots;
 +                // find the index of the lowest set bit that also occurs in our mask (i.e. is permitted alignment, and not yet searched)
 +                // we take the index, rather than finding the lowest bit, since we must obtain it anyway, and shifting is more efficient
 +                // than multiplication
 +                int index = Long.numberOfTrailingZeros(cur & searchMask);
 +
 +                // if no bit was actually found, we cannot serve this request, so return null.
 +                // due to truncating the searchMask this immediately terminates any search when we run out of indexes
 +                // that could accommodate the allocation, i.e. is equivalent to checking (64 - index) < slotCount
 +                if (index == 64)
 +                    return null;
 +
 +                // remove this bit from our searchMask, so we don't return here next round
 +                searchMask ^= 1L << index;
 +                // if our bits occur starting at the index, remove ourselves from the bitmask and return
 +                long candidate = slotBits << index;
 +                if ((candidate & cur) == candidate)
 +                {
 +                    // here we are sure we will manage to CAS successfully without changing candidate because
 +                    // there is only one thread allocating at the moment, the concurrency is with the release
 +                    // operations only
 +                    while (true)
 +                    {
 +                        // clear the candidate bits (freeSlots &= ~candidate)
 +                        if (freeSlotsUpdater.compareAndSet(this, cur, cur & ~candidate))
 +                            break;
 +
 +                        cur = freeSlots;
 +                        // make sure no other thread has cleared the candidate bits
 +                        assert ((candidate & cur) == candidate);
 +                    }
 +                    return get(index << shift, size);
 +                }
 +            }
 +        }
 +
 +        private ByteBuffer get(int offset, int size)
 +        {
 +            slab.limit(offset + size);
 +            slab.position(offset);
 +
 +            return setAttachment(slab.slice());
 +        }
 +
 +        /**
 +         * Round the size to the next unit multiple.
 +         */
 +        int roundUp(int v)
 +        {
 +            return BufferPool.roundUp(v, unit());
 +        }
 +
 +        /**
 +         * Release a buffer. Return:
 +         *    0L if the buffer must be recycled after the call;
 +         *   -1L if it is free (and so we should tryRecycle if owner is now null)
 +         *    some other value otherwise
 +         **/
 +        long free(ByteBuffer buffer, boolean tryRelease)
 +        {
 +            if (!releaseAttachment(buffer))
 +                return 1L;
 +
 +            long address = MemoryUtil.getAddress(buffer);
 +            assert (address >= baseAddress) & (address <= baseAddress + capacity());
 +
 +            int position = (int)(address - baseAddress);
 +            int size = roundUp(buffer.capacity());
 +
 +            position >>= shift;
 +            int slotCount = size >> shift;
 +
 +            long slotBits = (1L << slotCount) - 1;
 +            long shiftedSlotBits = (slotBits << position);
 +
 +            if (slotCount == 64)
 +            {
 +                assert size == capacity();
 +                assert position == 0;
 +                shiftedSlotBits = -1L;
 +            }
 +
 +            long next;
 +            while (true)
 +            {
 +                long cur = freeSlots;
 +                next = cur | shiftedSlotBits;
 +                assert next == (cur ^ shiftedSlotBits); // ensure no double free
 +                if (tryRelease && (next == -1L))
 +                    next = 0L;
 +                if (freeSlotsUpdater.compareAndSet(this, cur, next))
 +                    return next;
 +            }
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            return String.format("[slab %s, slots bitmap %s, capacity %d, free %d]", slab, Long.toBinaryString(freeSlots), capacity(), free());
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public static int roundUpNormal(int size)
 +    {
 +        return roundUp(size, CHUNK_SIZE / 64);
 +    }
 +
 +    private static int roundUp(int size, int unit)
 +    {
 +        int mask = unit - 1;
 +        return (size + mask) & ~mask;
 +    }
 +
 +    @VisibleForTesting
-     public static void shutdownLocalCleaner() throws InterruptedException
++    public static void shutdownLocalCleaner(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
 +    {
-         EXEC.shutdown();
-         EXEC.awaitTermination(60, TimeUnit.SECONDS);
++        shutdownNow(Arrays.asList(EXEC));
++        awaitTermination(timeout, unit, Arrays.asList(EXEC));
 +    }
 +}
diff --cc src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index 83ccad6,9c4824a..8061566
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@@ -23,10 -24,8 +24,11 @@@ import java.util.concurrent.atomic.Atom
  
  import com.google.common.annotations.VisibleForTesting;
  
 -import org.apache.cassandra.utils.ExecutorUtils;
 +import com.codahale.metrics.Timer;
 +import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 +import org.apache.cassandra.metrics.DefaultNameFactory;
  import org.apache.cassandra.utils.concurrent.WaitQueue;
++import org.apache.cassandra.utils.ExecutorUtils;
  
  
  /**
@@@ -69,12 -64,12 +71,11 @@@ public abstract class MemtablePoo
      public abstract boolean needToCopyOnHeap();
  
      @VisibleForTesting
-     public void shutdown() throws InterruptedException
+     public void shutdownAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
      {
-         cleaner.shutdown();
-         cleaner.awaitTermination(60, TimeUnit.SECONDS);
+         ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
      }
  
 -
      public abstract MemtableAllocator newAllocator();
  
      /**
diff --cc test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
index d0613b1,232ef0b..c77d725
--- a/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/UpgradeableCluster.java
@@@ -19,12 -19,10 +19,8 @@@
  package org.apache.cassandra.distributed;
  
  import java.io.File;
--import java.io.IOException;
--import java.nio.file.Files;
  import java.util.List;
- import java.util.Set;
  
- import org.apache.cassandra.distributed.api.Feature;
  import org.apache.cassandra.distributed.api.ICluster;
  import org.apache.cassandra.distributed.impl.AbstractCluster;
  import org.apache.cassandra.distributed.impl.IUpgradeableInstance;
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 1b385fb,29426cb..aea21e2
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -34,10 -35,6 +34,7 @@@ import java.util.concurrent.Future
  import java.util.function.BiConsumer;
  import java.util.function.Function;
  
- import org.slf4j.LoggerFactory;
- 
- import ch.qos.logback.classic.LoggerContext;
 +import org.apache.cassandra.batchlog.BatchlogManager;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.concurrent.SharedExecutorPool;
  import org.apache.cassandra.concurrent.StageManager;
@@@ -65,10 -64,8 +62,11 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.gms.ApplicationState;
  import org.apache.cassandra.gms.Gossiper;
  import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.hints.HintsService;
 +import org.apache.cassandra.index.SecondaryIndexManager;
+ import org.apache.cassandra.io.sstable.IndexSummaryManager;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.locator.InetAddressAndPort;
  import org.apache.cassandra.net.IMessageSink;
@@@ -81,12 -77,17 +79,18 @@@ import org.apache.cassandra.service.Cli
  import org.apache.cassandra.service.PendingRangeCalculatorService;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.StreamCoordinator;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.NanoTimeToCurrentTimeMillis;
  import org.apache.cassandra.utils.Throwables;
  import org.apache.cassandra.utils.concurrent.Ref;
 +import org.apache.cassandra.utils.memory.BufferPool;
  
+ import static java.util.concurrent.TimeUnit.MINUTES;
+ import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+ import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+ 
  public class Instance extends IsolatedExecutor implements IInvokableInstance
  {
      public final IInstanceConfig config;
@@@ -232,14 -261,45 +264,43 @@@
          }
      }
  
-     public void receiveMessage(IMessage message)
+     public void receiveMessage(IMessage imessage)
      {
          sync(() -> {
-             try (DataInputBuffer in = new DataInputBuffer(message.bytes()))
+             // Based on org.apache.cassandra.net.IncomingTcpConnection.receiveMessage
 -            try (DataInputStream input = new DataInputStream(new ByteArrayInputStream(imessage.bytes())))
++            try (DataInputBuffer input = new DataInputBuffer(imessage.bytes()))
              {
-                 MessageIn<?> messageIn = MessageIn.read(in, message.version(), message.id());
-                 Runnable deliver = new MessageDeliveryTask(messageIn, message.id(), System.currentTimeMillis(), false);
-                 deliver.run();
+                 int version = imessage.version();
+ 
+                 MessagingService.validateMagic(input.readInt());
+                 int id;
+                 if (version < MessagingService.VERSION_20)
+                     id = Integer.parseInt(input.readUTF());
+                 else
+                     id = input.readInt();
 -                assert imessage.id() == id;
+ 
+                 long timestamp = System.currentTimeMillis();
+                 boolean isCrossNodeTimestamp = false;
 -
+                 // make sure to readInt, even if cross_node_to is not enabled
+                 int partial = input.readInt();
+                 if (DatabaseDescriptor.hasCrossNodeTimeout())
+                 {
+                     long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+                     isCrossNodeTimestamp = (timestamp != crossNodeTimestamp);
+                     timestamp = crossNodeTimestamp;
+                 }
+ 
+                 MessageIn message = MessageIn.read(input, version, id);
+                 if (message == null)
+                 {
+                     // callback expired; nothing to do
+                     return;
+                 }
+                 if (version <= MessagingService.current_version)
+                 {
+                     MessagingService.instance().receive(message, id, timestamp, isCrossNodeTimestamp);
+                 }
+                 // else ignore message
              }
              catch (Throwable t)
              {
@@@ -265,12 -325,13 +326,14 @@@
              try
              {
                  mkdirs();
+ 
+                 DatabaseDescriptor.setDaemonInitialized();
                  DatabaseDescriptor.createAllDirectories();
  
 -                // We need to persist this as soon as possible after startup checks.
 +                // We need to  persist this as soon as possible after startup checks.
                  // This should be the first write to SystemKeyspace (CASSANDRA-11742)
                  SystemKeyspace.persistLocalMetadata();
 +                LegacySchemaMigrator.migrate();
  
                  try
                  {
@@@ -293,12 -354,24 +356,23 @@@
                  {
                      throw new RuntimeException(e);
                  }
 -
+                 if (config.has(NETWORK))
+                 {
+                     registerFilter(cluster);
+                     MessagingService.instance().listen();
+                 }
+                 else
+                 {
+                     // Even though we don't use MessagingService, access the static SocketFactory
+                     // instance here so that we start the static event loop state
+ //                    -- not sure what that means?  SocketFactory.instance.getClass();
+                     registerMockMessaging(cluster);
+                 }
  
-                 // TODO: support each separately
-                 if (with.contains(Feature.GOSSIP) || with.contains(Feature.NETWORK))
+                 // TODO: this is more than just gossip
+                 if (config.has(GOSSIP))
                  {
-                     StorageService.instance.prepareToJoin();
-                     StorageService.instance.joinTokenRing(1000);
+                     StorageService.instance.initServer();
                  }
                  else
                  {
@@@ -397,41 -465,42 +470,49 @@@
  
      public Future<Void> shutdown(boolean graceful)
      {
 +        if (!graceful)
 +            MessagingService.instance().shutdown(false);
 +
          Future<?> future = async((ExecutorService executor) -> {
              Throwable error = null;
+ 
+             if (config.has(GOSSIP) || config.has(NETWORK))
+             {
+                 StorageService.instance.shutdownServer();
+ 
+                 error = parallelRun(error, executor,
 -                    () -> NanoTimeToCurrentTimeMillis.shutdown(MINUTES.toMillis(1L))
++                                    () -> NanoTimeToCurrentTimeMillis.shutdown(MINUTES.toMillis(1L))
+                 );
+             }
+ 
              error = parallelRun(error, executor,
-                     Gossiper.instance::stop,
-                     CompactionManager.instance::forceShutdown,
-                     BatchlogManager.instance::shutdown,
-                     HintsService.instance::shutdownBlocking,
-                     SecondaryIndexManager::shutdownExecutors,
-                     ColumnFamilyStore::shutdownFlushExecutor,
-                     ColumnFamilyStore::shutdownPostFlushExecutor,
-                     ColumnFamilyStore::shutdownReclaimExecutor,
-                     PendingRangeCalculatorService.instance::shutdownExecutor,
-                     BufferPool::shutdownLocalCleaner,
-                     StorageService.instance::shutdownBGMonitor,
-                     Ref::shutdownReferenceReaper,
-                     Memtable.MEMORY_POOL::shutdown,
-                     ScheduledExecutors::shutdownAndWait,
-                     SSTableReader::shutdownBlocking
+                                 () -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
+                                 CompactionManager.instance::forceShutdown,
 -                                () -> BatchlogManager.shutdownAndWait(1L, MINUTES),
 -                                () -> HintedHandOffManager.instance.shutdownAndWait(1L, MINUTES),
++                                () -> BatchlogManager.instance.shutdownAndWait(1L, MINUTES),
++                                HintsService.instance::shutdownBlocking,
+                                 () -> StreamCoordinator.shutdownAndWait(1L, MINUTES),
++                                () -> SecondaryIndexManager.shutdownAndWait(1L, MINUTES),
+                                 () -> IndexSummaryManager.instance.shutdownAndWait(1L, MINUTES),
+                                 () -> ColumnFamilyStore.shutdownExecutorsAndWait(1L, MINUTES),
+                                 () -> PendingRangeCalculatorService.instance.shutdownExecutor(1L, MINUTES),
++                                () -> BufferPool.shutdownLocalCleaner(1L, MINUTES),
+                                 () -> StorageService.instance.shutdownBGMonitorAndWait(1L, MINUTES),
+                                 () -> Ref.shutdownReferenceReaper(1L, MINUTES),
+                                 () -> Memtable.MEMORY_POOL.shutdownAndWait(1L, MINUTES),
 -                                () -> SSTableReader.shutdownBlocking(1L, MINUTES),
 -                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)
++                                () -> SSTableReader.shutdownBlocking(1L, MINUTES)
              );
              error = parallelRun(error, executor,
 -                                CommitLog.instance::shutdownBlocking,
++                                () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES),
                                  MessagingService.instance()::shutdown
              );
              error = parallelRun(error, executor,
-                                 StageManager::shutdownAndWait,
-                                 SharedExecutorPool.SHARED::shutdownAndWait
+                                 () -> StageManager.shutdownAndWait(1L, MINUTES),
+                                 () -> SharedExecutorPool.SHARED.shutdownAndWait(1L, MINUTES)
              );
 +            error = parallelRun(error, executor,
 +                                CommitLog.instance::shutdownBlocking
 +            );
  
-             LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
-             loggerContext.stop();
              Throwables.maybeFail(error);
          }).apply(isolatedExecutor);
  
diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index 57530e0,363a1df..0ef5a69
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@@ -46,8 -48,9 +46,9 @@@ public class InstanceClassLoader extend
                 name.startsWith("org.apache.cassandra.distributed.api.")
              || name.startsWith("sun.")
              || name.startsWith("oracle.")
+             || name.startsWith("com.intellij.")
              || name.startsWith("com.sun.")
 -            || name.startsWith("com.oracle.")
 +            || name.startsWith("com.sun.")
              || name.startsWith("java.")
              || name.startsWith("javax.")
              || name.startsWith("jdk.")
diff --cc test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 3945ec5,757c17f..7b361bc
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@@ -41,6 -42,26 +42,23 @@@ public class DistributedTestBas
  
      public static String KEYSPACE = "distributed_test_keyspace";
  
+     public static void nativeLibraryWorkaround()
+     {
 -        // Disable the C library for in-JVM dtests otherwise it holds a gcroot against the InstanceClassLoader
 -        System.setProperty("cassandra.disable_clibrary", "true");
 -
+         // Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
+         // CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
+         // SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
+         System.setProperty("cassandra.disable_tcactive_openssl", "true");
+         System.setProperty("io.netty.transport.noNative", "true");
+     }
+ 
+     public static void processReaperWorkaround()
+     {
+         // Make sure the 'process reaper' thread is initially created under the main classloader,
+         // otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
+         // which prevents it from being garbage collected.
+         IsolatedExecutor.ThrowingRunnable.toRunnable(() -> new ProcessBuilder().command("true").start().waitFor()).run();
+     }
+ 
      @BeforeClass
      public static void setup()
      {
diff --cc test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
index 11e9985,0000000..c2e9e4f
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/GossipTest.java
@@@ -1,113 -1,0 +1,115 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.net.InetAddress;
 +import java.util.Collection;
- import java.util.Collections;
- import java.util.EnumSet;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.locks.LockSupport;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.collect.Iterables;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
- import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.Cluster;
- import org.apache.cassandra.distributed.api.Feature;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
- import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
++
 +public class GossipTest extends DistributedTestBase
 +{
 +
 +    @Test
 +    public void nodeDownDuringMove() throws Throwable
 +    {
 +        int liveCount = 1;
++        System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 30s default
 +        System.setProperty("cassandra.consistent.rangemovement", "false");
-         try (Cluster cluster = Cluster.create(2 + liveCount, EnumSet.of(Feature.GOSSIP)))
++        System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true");
++        try (Cluster cluster = Cluster.build(2 + liveCount)
++                                      .withConfig(config -> config.with(NETWORK).with(GOSSIP))
++                                      .createWithoutStarting())
 +        {
 +            int fail = liveCount + 1;
 +            int late = fail + 1;
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +                cluster.get(i).startup();
 +            cluster.get(fail).startup();
 +            Collection<String> expectTokens = cluster.get(fail).callsOnInstance(() ->
 +                StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress())
 +                                       .stream().map(Object::toString).collect(Collectors.toList())
 +            ).call();
 +
 +            InetAddress failAddress = cluster.get(fail).broadcastAddressAndPort().address;
 +            // wait for NORMAL state
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +            {
 +                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
 +                    EndpointState ep;
 +                    while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                           || ep.getApplicationState(ApplicationState.STATUS) == null
 +                           || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("NORMAL"))
 +                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +                }).accept(failAddress);
 +            }
 +
 +            // set ourselves to MOVING, and wait for it to propagate
 +            cluster.get(fail).runOnInstance(() -> {
 +
 +                Token token = Iterables.getFirst(StorageService.instance.getTokenMetadata().getTokens(FBUtilities.getBroadcastAddress()), null);
 +                Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.moving(token));
 +            });
 +
 +            for (int i = 1 ; i <= liveCount ; ++i)
 +            {
 +                cluster.get(i).acceptsOnInstance((InetAddress endpoint) -> {
 +                    EndpointState ep;
 +                    while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                           || (ep.getApplicationState(ApplicationState.STATUS) == null
 +                               || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING")))
 +                        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +                }).accept(failAddress);
 +            }
 +
 +            cluster.get(fail).shutdown(false).get();
 +            cluster.get(late).startup();
 +            cluster.get(late).acceptsOnInstance((InetAddress endpoint) -> {
 +                EndpointState ep;
 +                while (null == (ep = Gossiper.instance.getEndpointStateForEndpoint(endpoint))
 +                       || !ep.getApplicationState(ApplicationState.STATUS).value.startsWith("MOVING"))
 +                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(10L));
 +            }).accept(failAddress);
 +
 +            Collection<String> tokens = cluster.get(late).appliesOnInstance((InetAddress endpoint) ->
 +                StorageService.instance.getTokenMetadata().getTokens(failAddress)
 +                                       .stream().map(Object::toString).collect(Collectors.toList())
 +            ).apply(failAddress);
 +
 +            Assert.assertEquals(expectTokens, tokens);
 +        }
 +    }
 +    
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org