You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/12/12 19:49:00 UTC

[07/10] cassandra git commit: Merge branch cassandra-3.0 into cassandra-3.11

Merge branch cassandra-3.0 into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/50a9b1ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/50a9b1ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/50a9b1ab

Branch: refs/heads/trunk
Commit: 50a9b1abb1a46d264343058837f334d5a73b9bda
Parents: c80b9fb 7f668c6
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:39:37 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:39:41 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../concurrent/NamedThreadFactory.java          | 22 +++++++++-
 .../AbstractCommitLogSegmentManager.java        |  3 +-
 .../db/commitlog/AbstractCommitLogService.java  |  3 +-
 .../cassandra/index/sasi/TermIterator.java      |  3 +-
 .../cassandra/net/OutboundTcpConnection.java    | 42 ++++++++++----------
 .../apache/cassandra/repair/RepairRunnable.java |  4 +-
 .../scheduler/RoundRobinScheduler.java          | 12 +++---
 .../cassandra/service/StorageService.java       |  7 ++--
 9 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d056492,5bc30be..7413086
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,114 -1,5 +1,115 @@@
 -3.0.11
 +3.10
 + * Remove outboundBindAny configuration property (CASSANDRA-12673)
 + * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
 + * Remove timing window in test case (CASSANDRA-12875)
 + * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
 + * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
 + * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
 + * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
 + * Fix Murmur3PartitionerTest (CASSANDRA-12858)
 + * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
 + * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
 + * Fix cassandra-stress truncate option (CASSANDRA-12695)
 + * Fix crossNode value when receiving messages (CASSANDRA-12791)
 + * Don't load MX4J beans twice (CASSANDRA-12869)
 + * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
 + * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
 + * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
 + * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
 + * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
 + * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
 + * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
 + * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
 + * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
 + * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
 + * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
 + * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
 + * Add duration data type (CASSANDRA-11873)
 + * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
 + * Improve sum aggregate functions (CASSANDRA-12417)
 + * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
 + * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
 + * Check for hash conflicts in prepared statements (CASSANDRA-12733)
 + * Exit query parsing upon first error (CASSANDRA-12598)
 + * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
 + * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
 + * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
 + * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
 + * Add hint delivery metrics (CASSANDRA-12693)
 + * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
 + * ColumnIndex does not reuse buffer (CASSANDRA-12502)
 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
 + * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
 + * Tune compaction thread count via nodetool (CASSANDRA-12248)
 + * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
 + * Include repair session IDs in repair start message (CASSANDRA-12532)
 + * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
 + * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
 + * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
 + * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
 + * Fix cassandra-stress graphing (CASSANDRA-12237)
 + * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
 + * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
 + * Add JMH benchmarks.jar (CASSANDRA-12586)
 + * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
 + * Add keep-alive to streaming (CASSANDRA-11841)
 + * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
 + * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
 + * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
 + * Retry all internode messages once after a connection is
 +   closed and reopened (CASSANDRA-12192)
 + * Add support to rebuild from targeted replica (CASSANDRA-9875)
 + * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
 + * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
 + * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
 + * Extend read/write failure messages with a map of replica addresses
 +   to error codes in the v5 native protocol (CASSANDRA-12311)
 + * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
 + * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
 + * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
 + * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
 + * Added slow query log (CASSANDRA-12403)
 + * Count full coordinated request against timeout (CASSANDRA-12256)
 + * Allow TTL with null value on insert and update (CASSANDRA-12216)
 + * Make decommission operation resumable (CASSANDRA-12008)
 + * Add support to one-way targeted repair (CASSANDRA-9876)
 + * Remove clientutil jar (CASSANDRA-11635)
 + * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
 + * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
 + * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
 + * Make it possible to compact a given token range (CASSANDRA-10643)
 + * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
 + * Collect metrics on queries by consistency level (CASSANDRA-7384)
 + * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
 + * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
 + * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
 + * Add version command to cassandra-stress (CASSANDRA-12258)
 + * Create compaction-stress tool (CASSANDRA-11844)
 + * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
 + * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
 + * Support filtering on non-PRIMARY KEY columns in the CREATE
 +   MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
 + * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
 + * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
 + * Faster write path (CASSANDRA-12269)
 + * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
 + * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
 + * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
 + * Prepend snapshot name with "truncated" or "dropped" when a snapshot
 +   is taken before truncating or dropping a table (CASSANDRA-12178)
 + * Optimize RestrictionSet (CASSANDRA-12153)
 + * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
 + * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
 + * Create a system table to expose prepared statements (CASSANDRA-8831)
 + * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
 + * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
 + * Add supplied username to authentication error messages (CASSANDRA-12076)
 + * Remove pre-startup check for open JMX port (CASSANDRA-12074)
 + * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
 + * Restore resumable hints delivery (CASSANDRA-11960)
 + * Properly report LWT contention (CASSANDRA-12626)
 +Merged from 3.0:
+  * Thread local pools never cleaned up (CASSANDRA-13033)
   * Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
   * CQL often queries static columns unnecessarily (CASSANDRA-12768)
   * Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 00ddf44,0000000..eff35f4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,550 -1,0 +1,551 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.LockSupport;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.util.concurrent.*;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import net.nicoulaj.compilecommand.annotations.DontInline;
++import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
 +
 +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 +
 +/**
 + * Performs eager-creation of commit log segments in a background thread. All the
 + * public methods are thread safe.
 + */
 +public abstract class AbstractCommitLogSegmentManager
 +{
 +    static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
 +
 +    /**
 +     * Segment that is ready to be used. The management thread fills this and blocks until consumed.
 +     *
 +     * A single management thread produces this, and consumers are already synchronizing to make sure other work is
 +     * performed atomically with consuming this. Volatile to make sure writes by the management thread become
 +     * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
 +     * synchronize on 'this'.
 +     */
 +    private volatile CommitLogSegment availableSegment = null;
 +
 +    private final WaitQueue segmentPrepared = new WaitQueue();
 +
 +    /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
 +    private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
 +
 +    /**
 +     * The segment we are currently allocating commit log records to.
 +     *
 +     * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
 +     */
 +    private volatile CommitLogSegment allocatingFrom = null;
 +
 +    final String storageDirectory;
 +
 +    /**
 +     * Tracks commitlog size, in multiples of the segment size.  We need to do this so we can "promise" size
 +     * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
 +     * can see the effect of recycling segments immediately (even though they're really happening asynchronously
 +     * on the manager thread, which will take a ms or two).
 +     */
 +    private final AtomicLong size = new AtomicLong();
 +
 +    private Thread managerThread;
 +    protected final CommitLog commitLog;
 +    private volatile boolean shutdown;
 +
 +    private static final SimpleCachedBufferPool bufferPool =
 +        new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
 +
 +    AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
 +    {
 +        this.commitLog = commitLog;
 +        this.storageDirectory = storageDirectory;
 +    }
 +
 +    void start()
 +    {
 +        // The run loop for the manager thread
 +        Runnable runnable = new WrappedRunnable()
 +        {
 +            public void runMayThrow() throws Exception
 +            {
 +                while (!shutdown)
 +                {
 +                    try
 +                    {
 +                        assert availableSegment == null;
 +                        logger.debug("No segments in reserve; creating a fresh one");
 +                        availableSegment = createSegment();
 +                        if (shutdown)
 +                        {
 +                            // If shutdown() started and finished during segment creation, we are now left with a
 +                            // segment that no one will consume. Discard it.
 +                            discardAvailableSegment();
 +                            return;
 +                        }
 +
 +                        segmentPrepared.signalAll();
 +                        Thread.yield();
 +
 +                        if (availableSegment == null && !atSegmentBufferLimit())
 +                            // Writing threads need another segment now.
 +                            continue;
 +
 +                        // Writing threads are not waiting for new segments, we can spend time on other tasks.
 +                        // flush old Cfs if we're full
 +                        maybeFlushToReclaim();
 +
 +                        LockSupport.park();
 +                    }
 +                    catch (Throwable t)
 +                    {
 +                        JVMStabilityInspector.inspectThrowable(t);
 +                        if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
 +                            return;
 +                        // sleep some arbitrary period to avoid spamming CL
 +                        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
 +
 +                        // If we offered a segment, wait for it to be taken before reentering the loop.
 +                        // There could be a new segment in next not offered, but only on failure to discard it while
 +                        // shutting down-- nothing more can or needs to be done in that case.
 +                    }
 +
 +                    while (availableSegment != null || atSegmentBufferLimit() && !shutdown)
 +                        LockSupport.park();
 +                }
 +            }
 +        };
 +
 +        shutdown = false;
-         managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
++        managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
 +        managerThread.start();
 +
 +        // for simplicity, ensure the first segment is allocated before continuing
 +        advanceAllocatingFrom(null);
 +    }
 +
 +    private boolean atSegmentBufferLimit()
 +    {
 +        return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
 +    }
 +
 +    private void maybeFlushToReclaim()
 +    {
 +        long unused = unusedCapacity();
 +        if (unused < 0)
 +        {
 +            long flushingSize = 0;
 +            List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment == allocatingFrom)
 +                    break;
 +                flushingSize += segment.onDiskSize();
 +                segmentsToRecycle.add(segment);
 +                if (flushingSize + unused >= 0)
 +                    break;
 +            }
 +            flushDataFrom(segmentsToRecycle, false);
 +        }
 +    }
 +
 +
 +    /**
 +     * Allocate a segment within this CLSM. Should either succeed or throw.
 +     */
 +    public abstract Allocation allocate(Mutation mutation, int size);
 +
 +    /**
 +     * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
 +     * decide what to do with those segments on disk after they've been replayed.
 +     */
 +    abstract void handleReplayedSegment(final File file);
 +
 +    /**
 +     * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
 +     * to segment manager so it's performed on segment management thread.
 +     */
 +    abstract CommitLogSegment createSegment();
 +
 +    /**
 +     * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
 +     * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
 +     * during testing resets.
 +     *
 +     * @param segment segment to be discarded
 +     * @param delete  whether or not the segment is safe to be deleted.
 +     */
 +    abstract void discard(CommitLogSegment segment, boolean delete);
 +
 +    /**
 +     * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
 +     *
 +     * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
 +     */
 +    @DontInline
 +    void advanceAllocatingFrom(CommitLogSegment old)
 +    {
 +        while (true)
 +        {
 +            synchronized (this)
 +            {
 +                // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
 +                if (allocatingFrom != old)
 +                    return;
 +
 +                // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
 +                if (availableSegment != null)
 +                {
 +                    // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
 +                    // the critical section.
 +                    activeSegments.add(allocatingFrom = availableSegment);
 +                    availableSegment = null;
 +                    break;
 +                }
 +            }
 +
 +            awaitAvailableSegment(old);
 +        }
 +
 +        // Signal the management thread to prepare a new segment.
 +        wakeManager();
 +
 +        if (old != null)
 +        {
 +            // Now we can run the user defined command just after switching to the new commit log.
 +            // (Do this here instead of in the recycle call so we can get a head start on the archive.)
 +            commitLog.archiver.maybeArchive(old);
 +
 +            // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
 +            old.discardUnusedTail();
 +        }
 +
 +        // request that the CL be synced out-of-band, as we've finished a segment
 +        commitLog.requestExtraSync();
 +    }
 +
 +    void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
 +    {
 +        do
 +        {
 +            WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
 +            if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
 +                prepared.awaitUninterruptibly();
 +            else
 +                prepared.cancel();
 +        }
 +        while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
 +    }
 +
 +    /**
 +     * Switch to a new segment, regardless of how much is left in the current one.
 +     *
 +     * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
 +     */
 +    void forceRecycleAll(Iterable<UUID> droppedCfs)
 +    {
 +        List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
 +        CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
 +        advanceAllocatingFrom(last);
 +
 +        // wait for the commit log modifications
 +        last.waitForModifications();
 +
 +        // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
 +        // to complete
 +        Keyspace.writeOrder.awaitNewBarrier();
 +
 +        // flush and wait for all CFs that are dirty in segments up-to and including 'last'
 +        Future<?> future = flushDataFrom(segmentsToRecycle, true);
 +        try
 +        {
 +            future.get();
 +
 +            for (CommitLogSegment segment : activeSegments)
 +                for (UUID cfId : droppedCfs)
 +                    segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
 +
 +            // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
 +            // if the previous active segment was the only one to recycle (since an active segment isn't
 +            // necessarily dirty, and we only call dCS after a flush).
 +            for (CommitLogSegment segment : activeSegments)
 +            {
 +                if (segment.isUnused())
 +                    archiveAndDiscard(segment);
 +            }
 +
 +            CommitLogSegment first;
 +            if ((first = activeSegments.peek()) != null && first.id <= last.id)
 +                logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
 +        }
 +        catch (Throwable t)
 +        {
 +            // for now just log the error
 +            logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
 +        }
 +    }
 +
 +    /**
 +     * Indicates that a segment is no longer in use and that it should be discarded.
 +     *
 +     * @param segment segment that is no longer in use
 +     */
 +    void archiveAndDiscard(final CommitLogSegment segment)
 +    {
 +        boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
 +        if (!activeSegments.remove(segment))
 +            return; // already discarded
 +        // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
 +        logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
 +        discard(segment, archiveSuccess);
 +    }
 +
 +    /**
 +     * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
 +     * @param addedSize
 +     */
 +    void addSize(long addedSize)
 +    {
 +        size.addAndGet(addedSize);
 +    }
 +
 +    /**
 +     * @return the space (in bytes) used by all segment files.
 +     */
 +    public long onDiskSize()
 +    {
 +        return size.get();
 +    }
 +
 +    private long unusedCapacity()
 +    {
 +        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
 +        long currentSize = size.get();
 +        logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
 +        return total - currentSize;
 +    }
 +
 +    /**
 +     * Force a flush on all CFs that are still dirty in @param segments.
 +     *
 +     * @return a Future that will finish when all the flushes are complete.
 +     */
 +    private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
 +    {
 +        if (segments.isEmpty())
 +            return Futures.immediateFuture(null);
 +        final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
 +
 +        // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
 +        final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
 +
 +        for (CommitLogSegment segment : segments)
 +        {
 +            for (UUID dirtyCFId : segment.getDirtyCFIDs())
 +            {
 +                Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
 +                if (pair == null)
 +                {
 +                    // even though we remove the schema entry before a final flush when dropping a CF,
 +                    // it's still possible for a writer to race and finish his append after the flush.
 +                    logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
 +                    segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
 +                }
 +                else if (!flushes.containsKey(dirtyCFId))
 +                {
 +                    String keyspace = pair.left;
 +                    final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
 +                    // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
 +                    // no deadlock possibility since switchLock removal
 +                    flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
 +                }
 +            }
 +        }
 +
 +        return Futures.allAsList(flushes.values());
 +    }
 +
 +    /**
 +     * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
 +     * Only call this after the AbstractCommitLogService is shut down.
 +     */
 +    public void stopUnsafe(boolean deleteSegments)
 +    {
 +        logger.debug("CLSM closing and clearing existing commit log segments...");
 +
 +        shutdown();
 +        try
 +        {
 +            awaitTermination();
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new RuntimeException(e);
 +        }
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            closeAndDeleteSegmentUnsafe(segment, deleteSegments);
 +        activeSegments.clear();
 +
 +        size.set(0L);
 +
 +        logger.trace("CLSM done with closing and clearing existing commit log segments.");
 +    }
 +
 +    /**
 +     * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
 +     */
 +    void awaitManagementTasksCompletion()
 +    {
 +        if (availableSegment == null && !atSegmentBufferLimit())
 +        {
 +            awaitAvailableSegment(allocatingFrom);
 +        }
 +    }
 +
 +    /**
 +     * Explicitly for use only during resets in unit testing.
 +     */
 +    private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
 +    {
 +        try
 +        {
 +            discard(segment, delete);
 +        }
 +        catch (AssertionError ignored)
 +        {
 +            // segment file does not exist
 +        }
 +    }
 +
 +    /**
 +     * Initiates the shutdown process for the management thread.
 +     */
 +    public void shutdown()
 +    {
 +        assert !shutdown;
 +        shutdown = true;
 +
 +        // Release the management thread and delete prepared segment.
 +        // Do not block as another thread may claim the segment (this can happen during unit test initialization).
 +        discardAvailableSegment();
 +        wakeManager();
 +    }
 +
 +    private void discardAvailableSegment()
 +    {
 +        CommitLogSegment next = null;
 +        synchronized (this)
 +        {
 +            next = availableSegment;
 +            availableSegment = null;
 +        }
 +        if (next != null)
 +            next.discard(true);
 +    }
 +
 +    /**
 +     * Returns when the management thread terminates.
 +     */
 +    public void awaitTermination() throws InterruptedException
 +    {
 +        managerThread.join();
 +        managerThread = null;
 +
 +        for (CommitLogSegment segment : activeSegments)
 +            segment.close();
 +
 +        bufferPool.shutdown();
 +    }
 +
 +    /**
 +     * @return a read-only collection of the active commit log segments
 +     */
 +    @VisibleForTesting
 +    public Collection<CommitLogSegment> getActiveSegments()
 +    {
 +        return Collections.unmodifiableCollection(activeSegments);
 +    }
 +
 +    /**
 +     * @return the current CommitLogPosition of the active segment we're allocating from
 +     */
 +    CommitLogPosition getCurrentPosition()
 +    {
 +        return allocatingFrom.getCurrentCommitLogPosition();
 +    }
 +
 +    /**
 +     * Forces a disk flush on the commit log files that need it.  Blocking.
 +     */
 +    public void sync() throws IOException
 +    {
 +        CommitLogSegment current = allocatingFrom;
 +        for (CommitLogSegment segment : getActiveSegments())
 +        {
 +            // Do not sync segments that became active after sync started.
 +            if (segment.id > current.id)
 +                return;
 +            segment.sync();
 +        }
 +    }
 +
 +    /**
 +     * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
 +     */
 +    SimpleCachedBufferPool getBufferPool()
 +    {
 +        return bufferPool;
 +    }
 +
 +    void wakeManager()
 +    {
 +        LockSupport.unpark(managerThread);
 +    }
 +
 +    /**
 +     * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
 +     * a buffer to become available.
 +     */
 +    void notifyBufferFreed()
 +    {
 +        wakeManager();
 +    }
 +
 +    /** Read-only access to current segment for subclasses. */
 +    CommitLogSegment allocatingFrom()
 +    {
 +        return allocatingFrom;
 +    }
 +}
 +

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7b56da3,e5a5887..834aa0d
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,18 -17,16 +17,19 @@@
   */
  package org.apache.cassandra.db.commitlog;
  
 -import org.apache.cassandra.concurrent.NamedThreadFactory;
 -import org.apache.cassandra.utils.NoSpamLogger;
 -import org.apache.cassandra.utils.concurrent.WaitQueue;
 -import org.slf4j.*;
 -
 -import java.util.concurrent.Semaphore;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.LockSupport;
  
 -import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.codahale.metrics.Timer.Context;
 +
++import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 +import org.apache.cassandra.utils.NoSpamLogger;
 +import org.apache.cassandra.utils.concurrent.WaitQueue;
  
  public abstract class AbstractCommitLogService
  {
@@@ -148,8 -160,7 +149,8 @@@
              }
          };
  
 +        shutdown = false;
-         thread = new Thread(runnable, name);
+         thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
          thread.start();
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/index/sasi/TermIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/sasi/TermIterator.java
index 5b08a56,0000000..1ddfcb9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
@@@ -1,218 -1,0 +1,219 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.index.sasi;
 +
 +import java.util.List;
 +import java.util.Set;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
++import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
 +import org.apache.cassandra.index.sasi.disk.Token;
 +import org.apache.cassandra.index.sasi.plan.Expression;
 +import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
 +import org.apache.cassandra.index.sasi.utils.RangeIterator;
 +import org.apache.cassandra.io.util.FileUtils;
 +
 +import com.google.common.util.concurrent.MoreExecutors;
 +import com.google.common.util.concurrent.Uninterruptibles;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +public class TermIterator extends RangeIterator<Long, Token>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(TermIterator.class);
 +
 +    private static final ThreadLocal<ExecutorService> SEARCH_EXECUTOR = new ThreadLocal<ExecutorService>()
 +    {
 +        public ExecutorService initialValue()
 +        {
 +            final String currentThread = Thread.currentThread().getName();
 +            final int concurrencyFactor = DatabaseDescriptor.searchConcurrencyFactor();
 +
 +            logger.info("Search Concurrency Factor is set to {} for {}", concurrencyFactor, currentThread);
 +
 +            return (concurrencyFactor <= 1)
 +                    ? MoreExecutors.newDirectExecutorService()
 +                    : Executors.newFixedThreadPool(concurrencyFactor, new ThreadFactory()
 +            {
 +                public final AtomicInteger count = new AtomicInteger();
 +
 +                public Thread newThread(Runnable task)
 +                {
-                     return new Thread(task, currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
++                    return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
 +                }
 +            });
 +        }
 +    };
 +
 +    private final Expression expression;
 +
 +    private final RangeIterator<Long, Token> union;
 +    private final Set<SSTableIndex> referencedIndexes;
 +
 +    private TermIterator(Expression e,
 +                         RangeIterator<Long, Token> union,
 +                         Set<SSTableIndex> referencedIndexes)
 +    {
 +        super(union.getMinimum(), union.getMaximum(), union.getCount());
 +
 +        this.expression = e;
 +        this.union = union;
 +        this.referencedIndexes = referencedIndexes;
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public static TermIterator build(final Expression e, Set<SSTableIndex> perSSTableIndexes)
 +    {
 +        final List<RangeIterator<Long, Token>> tokens = new CopyOnWriteArrayList<>();
 +        final AtomicLong tokenCount = new AtomicLong(0);
 +
 +        RangeIterator<Long, Token> memtableIterator = e.index.searchMemtable(e);
 +        if (memtableIterator != null)
 +        {
 +            tokens.add(memtableIterator);
 +            tokenCount.addAndGet(memtableIterator.getCount());
 +        }
 +
 +        final Set<SSTableIndex> referencedIndexes = new CopyOnWriteArraySet<>();
 +
 +        try
 +        {
 +            final CountDownLatch latch = new CountDownLatch(perSSTableIndexes.size());
 +            final ExecutorService searchExecutor = SEARCH_EXECUTOR.get();
 +
 +            for (final SSTableIndex index : perSSTableIndexes)
 +            {
 +                if (e.getOp() == Expression.Op.PREFIX &&
 +                    index.mode() == OnDiskIndexBuilder.Mode.CONTAINS && !index.hasMarkedPartials())
 +                    throw new UnsupportedOperationException(String.format("The index %s has not yet been upgraded " +
 +                                                                          "to support prefix queries in CONTAINS mode. " +
 +                                                                          "Wait for compaction or rebuild the index.",
 +                                                                          index.getPath()));
 +
 +
 +                if (!index.reference())
 +                {
 +                    latch.countDown();
 +                    continue;
 +                }
 +
 +                // add to referenced right after the reference was acquired,
 +                // that helps to release index if something goes bad inside of the search
 +                referencedIndexes.add(index);
 +
 +                searchExecutor.submit((Runnable) () -> {
 +                    try
 +                    {
 +                        e.checkpoint();
 +
 +                        RangeIterator<Long, Token> keyIterator = index.search(e);
 +                        if (keyIterator == null)
 +                        {
 +                            releaseIndex(referencedIndexes, index);
 +                            return;
 +                        }
 +
 +                        tokens.add(keyIterator);
 +                        tokenCount.getAndAdd(keyIterator.getCount());
 +                    }
 +                    catch (Throwable e1)
 +                    {
 +                        releaseIndex(referencedIndexes, index);
 +
 +                        if (logger.isDebugEnabled())
 +                            logger.debug(String.format("Failed search an index %s, skipping.", index.getPath()), e1);
 +                    }
 +                    finally
 +                    {
 +                        latch.countDown();
 +                    }
 +                });
 +            }
 +
 +            Uninterruptibles.awaitUninterruptibly(latch);
 +
 +            // checkpoint right away after all indexes complete search because we might have crossed the quota
 +            e.checkpoint();
 +
 +            RangeIterator<Long, Token> ranges = RangeUnionIterator.build(tokens);
 +            return ranges == null ? null : new TermIterator(e, ranges, referencedIndexes);
 +        }
 +        catch (Throwable ex)
 +        {
 +            // if execution quota was exceeded while opening indexes or something else happened
 +            // local (yet to be tracked) indexes should be released first before re-throwing exception
 +            referencedIndexes.forEach(TermIterator::releaseQuietly);
 +
 +            throw ex;
 +        }
 +    }
 +
 +    protected Token computeNext()
 +    {
 +        try
 +        {
 +            return union.hasNext() ? union.next() : endOfData();
 +        }
 +        finally
 +        {
 +            expression.checkpoint();
 +        }
 +    }
 +
 +    protected void performSkipTo(Long nextToken)
 +    {
 +        try
 +        {
 +            union.skipTo(nextToken);
 +        }
 +        finally
 +        {
 +            expression.checkpoint();
 +        }
 +    }
 +
 +    public void close()
 +    {
 +        FileUtils.closeQuietly(union);
 +        referencedIndexes.forEach(TermIterator::releaseQuietly);
 +        referencedIndexes.clear();
 +    }
 +
 +    private static void releaseIndex(Set<SSTableIndex> indexes, SSTableIndex index)
 +    {
 +        indexes.remove(index);
 +        releaseQuietly(index);
 +    }
 +
 +    private static void releaseQuietly(SSTableIndex index)
 +    {
 +        try
 +        {
 +            index.release();
 +        }
 +        catch (Throwable e)
 +        {
 +            logger.error(String.format("Failed to release index %s", index.getPath()), e);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1f47334,a9dfcdc..1843e7b
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -502,31 -506,27 +503,28 @@@ public class OutboundTcpConnection exte
      {
          final AtomicInteger version = new AtomicInteger(NO_VERSION);
          final CountDownLatch versionLatch = new CountDownLatch(1);
-         new Thread("HANDSHAKE-" + poolReference.endPoint())
 -        new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
++        Runnable r = () ->
          {
-             @Override
-             public void run()
+             try
              {
-                 try
-                 {
-                     logger.info("Handshaking version with {}", poolReference.endPoint());
-                     version.set(inputStream.readInt());
-                 }
-                 catch (IOException ex)
-                 {
-                     final String msg = "Cannot handshake version with " + poolReference.endPoint();
-                     if (logger.isTraceEnabled())
-                         logger.trace(msg, ex);
-                     else
-                         logger.info(msg);
-                 }
-                 finally
-                 {
-                     //unblock the waiting thread on either success or fail
-                     versionLatch.countDown();
-                 }
+                 logger.info("Handshaking version with {}", poolReference.endPoint());
+                 version.set(inputStream.readInt());
+             }
+             catch (IOException ex)
+             {
+                 final String msg = "Cannot handshake version with " + poolReference.endPoint();
+                 if (logger.isTraceEnabled())
+                     logger.trace(msg, ex);
+                 else
+                     logger.info(msg);
+             }
+             finally
+             {
+                 //unblock the waiting thread on either success or fail
+                 versionLatch.countDown();
              }
-         }.start();
 -        }),"HANDSHAKE-" + poolReference.endPoint()).start();
++        };
++        new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start();
  
          try
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe,61dfa50..904deb3
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@@ -59,17 -60,17 +60,14 @@@ public class RoundRobinScheduler implem
          taskCount = new Semaphore(options.throttle_limit - 1);
  
          queues = new NonBlockingHashMap<String, WeightedQueue>();
--        Runnable runnable = new Runnable()
++        Runnable runnable = () ->
          {
--            public void run()
++            while (true)
              {
--                while (true)
--                {
--                    schedule();
--                }
++                schedule();
              }
          };
-         Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+         Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
          scheduler.start();
          logger.info("Started the RoundRobin Request Scheduler");
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 3f999c2,71cbc35..1247e03
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -631,8 -592,29 +632,8 @@@ public class StorageService extends Not
              throw new AssertionError(e);
          }
  
 -        if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
 -        {
 -            logger.info("Loading persisted ring state");
 -            Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
 -            Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
 -            for (InetAddress ep : loadedTokens.keySet())
 -            {
 -                if (ep.equals(FBUtilities.getBroadcastAddress()))
 -                {
 -                    // entry has been mistakenly added, delete it
 -                    SystemKeyspace.removeEndpoint(ep);
 -                }
 -                else
 -                {
 -                    if (loadedHostIds.containsKey(ep))
 -                        tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
 -                    Gossiper.instance.addSavedEndpoint(ep);
 -                }
 -            }
 -        }
 -
          // daemon threads, like our executors', continue to run while shutdown hooks are invoked
-         drainOnShutdown = new Thread(new WrappedRunnable()
+         drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
          {
              @Override
              public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@@ -647,10 -629,10 +648,10 @@@
                  logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
                  logbackHook.run();
              }
-         }, "StorageServiceShutdownHook");
+         }), "StorageServiceShutdownHook");
          Runtime.getRuntime().addShutdownHook(drainOnShutdown);
  
 -        replacing = DatabaseDescriptor.isReplacing();
 +        replacing = isReplacing();
  
          if (!Boolean.parseBoolean(System.getProperty("cassandra.start_gossip", "true")))
          {