You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/02/05 17:09:05 UTC

[1/8] cassandra git commit: Move to FastThreadLocalThread and FastThreadLocal

Repository: cassandra
Updated Branches:
  refs/heads/trunk ded6b7010 -> 45c92ba91


Move to FastThreadLocalThread and FastThreadLocal

patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13034


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cecbe17e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cecbe17e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cecbe17e

Branch: refs/heads/trunk
Commit: cecbe17e3eafc052acc13950494f7dddf026aa54
Parents: f71e7e1
Author: Robert Stupp <sn...@snazy.de>
Authored: Tue Dec 13 17:37:09 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:54:19 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../concurrent/NamedThreadFactory.java          | 42 ++++++++-
 .../functions/ThreadAwareSecurityManager.java   |  3 +-
 .../AbstractCommitLogSegmentManager.java        |  2 +-
 .../db/commitlog/AbstractCommitLogService.java  |  2 +-
 .../db/compaction/CompactionManager.java        |  3 +-
 .../apache/cassandra/db/marshal/AsciiType.java  |  3 +-
 .../hints/EncryptedChecksummedDataInput.java    |  3 +-
 .../cassandra/index/sasi/TermIterator.java      |  5 +-
 .../io/compress/DeflateCompressor.java          | 11 +--
 .../cassandra/net/OutboundTcpConnection.java    |  5 +-
 .../apache/cassandra/repair/RepairRunnable.java |  6 +-
 .../scheduler/RoundRobinScheduler.java          |  2 +-
 .../cassandra/security/CipherFactory.java       |  3 +-
 .../cassandra/security/EncryptionUtils.java     |  3 +-
 .../serializers/TimestampSerializer.java        |  7 +-
 .../cassandra/service/StorageService.java       |  8 +-
 .../cassandra/thrift/ThriftSessionManager.java  |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 25 +++---
 .../org/apache/cassandra/utils/FBUtilities.java | 15 ++--
 .../org/apache/cassandra/cql3/ViewLongTest.java |  3 +-
 .../test/microbench/FastThreadExecutor.java     | 65 +-------------
 .../test/microbench/FastThreadLocalBench.java   | 92 ++++++++++++++++++++
 .../cassandra/cache/CacheProviderTest.java      |  3 +-
 .../cassandra/concurrent/WaitQueueTest.java     |  4 +-
 .../cassandra/db/RecoveryManagerTest.java       | 22 +++--
 .../apache/cassandra/hints/HintsBufferTest.java |  3 +-
 .../io/sstable/IndexSummaryManagerTest.java     |  3 +-
 .../io/sstable/SSTableRewriterTest.java         |  5 +-
 .../apache/cassandra/service/RemoveTest.java    | 26 +++---
 .../apache/cassandra/utils/TopKSamplerTest.java |  5 +-
 .../apache/cassandra/stress/StressServer.java   |  6 +-
 .../operations/userdefined/TokenRangeQuery.java |  3 +-
 33 files changed, 239 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0836475..7c1cd82 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,10 +1,12 @@
 3.11.0
+ * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
 Merged from 3.0:
  * Fix handling of partition with partition-level deletion plus
    live rows in sstabledump (CASSANDRA-13177)
  * Provide user workaround when system_schema.columns does not contain entries
    for a table that's in system_schema.tables (CASSANDRA-13180)
 
+
 3.10
  * Fix secondary index queries regression (CASSANDRA-13013)
  * Add duration type to the protocol V5 (CASSANDRA-12850)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 22193c4..5d89f6c 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.concurrent;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.util.concurrent.FastThreadLocal;
 import io.netty.util.concurrent.FastThreadLocalThread;
 
@@ -58,9 +60,8 @@ public class NamedThreadFactory implements ThreadFactory
     public Thread newThread(Runnable runnable)
     {
         String name = id + ':' + n.getAndIncrement();
-        Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
+        Thread thread = createThread(threadGroup, runnable, name, true);
         thread.setPriority(priority);
-        thread.setDaemon(true);
         if (contextClassLoader != null)
             thread.setContextClassLoader(contextClassLoader);
         return thread;
@@ -75,11 +76,44 @@ public class NamedThreadFactory implements ThreadFactory
     {
         return () ->
         {
-            try {
+            try
+            {
                 r.run();
-            } finally {
+            }
+            finally
+            {
                 FastThreadLocal.removeAll();
             }
         };
     }
+
+    private static final AtomicInteger threadCounter = new AtomicInteger();
+
+    @VisibleForTesting
+    public static Thread createThread(Runnable runnable)
+    {
+        return createThread(null, runnable, "anonymous-" + threadCounter.incrementAndGet());
+    }
+
+    public static Thread createThread(Runnable runnable, String name)
+    {
+        return createThread(null, runnable, name);
+    }
+
+    public static Thread createThread(Runnable runnable, String name, boolean daemon)
+    {
+        return createThread(null, runnable, name, daemon);
+    }
+
+    public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name)
+    {
+        return createThread(threadGroup, runnable, name, false);
+    }
+
+    public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name, boolean daemon)
+    {
+        Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
+        thread.setDaemon(daemon);
+        return thread;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java b/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
index 676117d..2e4bb4d 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
@@ -36,6 +36,7 @@ import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.spi.TurboFilterList;
 import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter;
 import ch.qos.logback.classic.turbo.TurboFilter;
+import io.netty.util.concurrent.FastThreadLocal;
 
 /**
  * Custom {@link SecurityManager} and {@link Policy} implementation that only performs access checks
@@ -175,7 +176,7 @@ public final class ThreadAwareSecurityManager extends SecurityManager
         });
     }
 
-    private static final ThreadLocal<Boolean> initializedThread = new ThreadLocal<>();
+    private static final FastThreadLocal<Boolean> initializedThread = new FastThreadLocal<>();
 
     private ThreadAwareSecurityManager()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index eff35f4..0ab941b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -147,7 +147,7 @@ public abstract class AbstractCommitLogSegmentManager
         };
 
         shutdown = false;
-        managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
+        managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR");
         managerThread.start();
 
         // for simplicity, ensure the first segment is allocated before continuing

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 834aa0d..71100a3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -150,7 +150,7 @@ public abstract class AbstractCommitLogService
         };
 
         shutdown = false;
-        thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
+        thread = NamedThreadFactory.createThread(runnable, name);
         thread.start();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f34b8a9..f16c1de 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
@@ -93,7 +94,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     // A thread local that tells us if the current thread is owned by the compaction manager. Used
     // by CounterContext to figure out if it should log a warning for invalid counter shards.
-    public static final ThreadLocal<Boolean> isCompactionManager = new ThreadLocal<Boolean>()
+    public static final FastThreadLocal<Boolean> isCompactionManager = new FastThreadLocal<Boolean>()
     {
         @Override
         protected Boolean initialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/marshal/AsciiType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AsciiType.java b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
index 089e388..3cd45de 100644
--- a/src/java/org/apache/cassandra/db/marshal/AsciiType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
@@ -23,6 +23,7 @@ import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
 import java.nio.charset.CharacterCodingException;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.cql3.Constants;
 import org.apache.cassandra.cql3.Json;
 
@@ -40,7 +41,7 @@ public class AsciiType extends AbstractType<String>
 
     AsciiType() {super(ComparisonType.BYTE_ORDER);} // singleton
 
-    private final ThreadLocal<CharsetEncoder> encoder = new ThreadLocal<CharsetEncoder>()
+    private final FastThreadLocal<CharsetEncoder> encoder = new FastThreadLocal<CharsetEncoder>()
     {
         @Override
         protected CharsetEncoder initialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
index b335226..b01161d 100644
--- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -23,6 +23,7 @@ import javax.crypto.Cipher;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.security.EncryptionUtils;
 import org.apache.cassandra.hints.CompressedChecksummedDataInput.Position;
 import org.apache.cassandra.io.FSReadError;
@@ -31,7 +32,7 @@ import org.apache.cassandra.io.util.ChannelProxy;
 
 public class EncryptedChecksummedDataInput extends ChecksummedDataInput
 {
-    private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+    private static final FastThreadLocal<ByteBuffer> reusableBuffers = new FastThreadLocal<ByteBuffer>()
     {
         protected ByteBuffer initialValue()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/index/sasi/TermIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/TermIterator.java b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
index 1ddfcb9..03dea18 100644
--- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
@@ -23,6 +23,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
@@ -42,7 +43,7 @@ public class TermIterator extends RangeIterator<Long, Token>
 {
     private static final Logger logger = LoggerFactory.getLogger(TermIterator.class);
 
-    private static final ThreadLocal<ExecutorService> SEARCH_EXECUTOR = new ThreadLocal<ExecutorService>()
+    private static final FastThreadLocal<ExecutorService> SEARCH_EXECUTOR = new FastThreadLocal<ExecutorService>()
     {
         public ExecutorService initialValue()
         {
@@ -59,7 +60,7 @@ public class TermIterator extends RangeIterator<Long, Token>
 
                 public Thread newThread(Runnable task)
                 {
-                    return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
+                    return NamedThreadFactory.createThread(task, currentThread + "-SEARCH-" + count.incrementAndGet(), true);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index a2ed65c..8557f5f 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.compress;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.schema.CompressionParams;
 
 import java.io.IOException;
@@ -32,7 +33,7 @@ public class DeflateCompressor implements ICompressor
 {
     public static final DeflateCompressor instance = new DeflateCompressor();
 
-    private static final ThreadLocal<byte[]> threadLocalScratchBuffer = new ThreadLocal<byte[]>()
+    private static final FastThreadLocal<byte[]> threadLocalScratchBuffer = new FastThreadLocal<byte[]>()
     {
         @Override
         protected byte[] initialValue()
@@ -46,8 +47,8 @@ public class DeflateCompressor implements ICompressor
         return threadLocalScratchBuffer.get();
     }
 
-    private final ThreadLocal<Deflater> deflater;
-    private final ThreadLocal<Inflater> inflater;
+    private final FastThreadLocal<Deflater> deflater;
+    private final FastThreadLocal<Inflater> inflater;
 
     public static DeflateCompressor create(Map<String, String> compressionOptions)
     {
@@ -57,7 +58,7 @@ public class DeflateCompressor implements ICompressor
 
     private DeflateCompressor()
     {
-        deflater = new ThreadLocal<Deflater>()
+        deflater = new FastThreadLocal<Deflater>()
         {
             @Override
             protected Deflater initialValue()
@@ -65,7 +66,7 @@ public class DeflateCompressor implements ICompressor
                 return new Deflater();
             }
         };
-        inflater = new ThreadLocal<Inflater>()
+        inflater = new FastThreadLocal<Inflater>()
         {
             @Override
             protected Inflater initialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1843e7b..0693ac3 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -503,7 +503,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
     {
         final AtomicInteger version = new AtomicInteger(NO_VERSION);
         final CountDownLatch versionLatch = new CountDownLatch(1);
-        Runnable r = () ->
+        NamedThreadFactory.createThread(() ->
         {
             try
             {
@@ -523,8 +523,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                 //unblock the waiting thread on either success or fail
                 versionLatch.countDown();
             }
-        };
-        new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start();
+        }, "HANDSHAKE-" + poolReference.endPoint()).start();
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 013dcec..c9eed54 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -73,6 +73,8 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
     private final List<ProgressListener> listeners = new ArrayList<>();
 
+    private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
     public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
     {
         this.storageService = storageService;
@@ -376,7 +378,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
     private Thread createQueryThread(final int cmd, final UUID sessionId)
     {
-        return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
+        return NamedThreadFactory.createThread(new WrappedRunnable()
         {
             // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
             // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
@@ -443,6 +445,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                     seen[si].clear();
                 }
             }
-        }));
+        }, "Repair-Runnable-" + threadCounter.incrementAndGet());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index 904deb3..fd967f3 100644
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@ -67,7 +67,7 @@ public class RoundRobinScheduler implements IRequestScheduler
                 schedule();
             }
         };
-        Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
+        Thread scheduler = NamedThreadFactory.createThread(runnable, "REQUEST-SCHEDULER");
         scheduler.start();
         logger.info("Started the RoundRobin Request Scheduler");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/security/CipherFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java b/src/java/org/apache/cassandra/security/CipherFactory.java
index 7c1495a..3f5c5f3 100644
--- a/src/java/org/apache/cassandra/security/CipherFactory.java
+++ b/src/java/org/apache/cassandra/security/CipherFactory.java
@@ -39,6 +39,7 @@ import com.google.common.cache.RemovalNotification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.config.TransparentDataEncryptionOptions;
 
 /**
@@ -54,7 +55,7 @@ public class CipherFactory
      * Bonus points if you can avoid calling (@code Cipher#init); hence, the point of the supporting struct
      * for caching Cipher instances.
      */
-    private static final ThreadLocal<CachedCipher> cipherThreadLocal = new ThreadLocal<>();
+    private static final FastThreadLocal<CachedCipher> cipherThreadLocal = new FastThreadLocal<>();
 
     private final SecureRandom secureRandom;
     private final LoadingCache<String, Key> cache;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java
index bb61260..855e2a9 100644
--- a/src/java/org/apache/cassandra/security/EncryptionUtils.java
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -28,6 +28,7 @@ import javax.crypto.ShortBufferException;
 
 import com.google.common.base.Preconditions;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.db.commitlog.EncryptedSegment;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
@@ -45,7 +46,7 @@ public class EncryptionUtils
     public static final int COMPRESSED_BLOCK_HEADER_SIZE = 4;
     public static final int ENCRYPTED_BLOCK_HEADER_SIZE = 8;
 
-    private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+    private static final FastThreadLocal<ByteBuffer> reusableBuffers = new FastThreadLocal<ByteBuffer>()
     {
         protected ByteBuffer initialValue()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
index a4a6f80..ac75d4b 100644
--- a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.serializers;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import java.nio.ByteBuffer;
@@ -90,7 +91,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
     private static final String DEFAULT_FORMAT = dateStringPatterns[6];
     private static final Pattern timestampPattern = Pattern.compile("^-?\\d+$");
 
-    private static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>()
+    private static final FastThreadLocal<SimpleDateFormat> FORMATTER = new FastThreadLocal<SimpleDateFormat>()
     {
         protected SimpleDateFormat initialValue()
         {
@@ -99,7 +100,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
     };
 
     private static final String UTC_FORMAT = dateStringPatterns[40];
-    private static final ThreadLocal<SimpleDateFormat> FORMATTER_UTC = new ThreadLocal<SimpleDateFormat>()
+    private static final FastThreadLocal<SimpleDateFormat> FORMATTER_UTC = new FastThreadLocal<SimpleDateFormat>()
     {
         protected SimpleDateFormat initialValue()
         {
@@ -110,7 +111,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
     };
 
     private static final String TO_JSON_FORMAT = dateStringPatterns[19];
-    private static final ThreadLocal<SimpleDateFormat> FORMATTER_TO_JSON = new ThreadLocal<SimpleDateFormat>()
+    private static final FastThreadLocal<SimpleDateFormat> FORMATTER_TO_JSON = new FastThreadLocal<SimpleDateFormat>()
     {
         protected SimpleDateFormat initialValue()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 720269f..b64cf13 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -126,6 +126,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     @Deprecated
     private final LegacyJMXProgressSupport legacyProgressSupport;
 
+    private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
     private static int getRingDelay()
     {
         String newdelay = System.getProperty("cassandra.ring_delay_ms");
@@ -634,7 +636,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
 
         // daemon threads, like our executors', continue to run while shutdown hooks are invoked
-        drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
+        drainOnShutdown = NamedThreadFactory.createThread(new WrappedRunnable()
         {
             @Override
             public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@ -649,7 +651,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
                 logbackHook.run();
             }
-        }), "StorageServiceShutdownHook");
+        }, "StorageServiceShutdownHook");
         Runtime.getRuntime().addShutdownHook(drainOnShutdown);
 
         replacing = isReplacing();
@@ -3552,7 +3554,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return 0;
 
         int cmd = nextRepairCommand.incrementAndGet();
-        new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start();
+        NamedThreadFactory.createThread(createRepairTask(cmd, keyspace, options, legacy), "Repair-Task-" + threadCounter.incrementAndGet()).start();
         return cmd;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
index 7d22507..60da3b4 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
 /**
  * Encapsulates the current client state (session).
  *
@@ -35,7 +37,7 @@ public class ThriftSessionManager
     private static final Logger logger = LoggerFactory.getLogger(ThriftSessionManager.class);
     public final static ThriftSessionManager instance = new ThriftSessionManager();
 
-    private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<>();
+    private final FastThreadLocal<SocketAddress> remoteSocket = new FastThreadLocal<>();
     private final ConcurrentHashMap<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<>();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 16a64df..0aa980f 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.utils;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.io.util.FileUtils;
 import org.slf4j.Logger;
@@ -127,25 +128,21 @@ public class CoalescingStrategies
             this.displayName = displayName;
             if (DEBUG_COALESCING)
             {
-                new Thread(displayName + " debug thread")
+                NamedThreadFactory.createThread(() ->
                 {
-                    @Override
-                    public void run()
+                    while (true)
                     {
-                        while (true)
+                        try
                         {
-                            try
-                            {
-                                Thread.sleep(5000);
-                            }
-                            catch (InterruptedException e)
-                            {
-                                throw new AssertionError();
-                            }
-                            shouldLogAverage = true;
+                            Thread.sleep(5000);
                         }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError();
+                        }
+                        shouldLogAverage = true;
                     }
-                }.start();
+                }, displayName + " debug thread").start();
             }
             RandomAccessFile rasTemp = null;
             ByteBuffer logBufferTemp = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index a925c0e..2138ea5 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthorizer;
 import org.apache.cassandra.auth.IRoleManager;
@@ -87,28 +88,22 @@ public class FBUtilities
             return Runtime.getRuntime().availableProcessors();
     }
 
-    private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>()
+    private static final FastThreadLocal<MessageDigest> localMD5Digest = new FastThreadLocal<MessageDigest>()
     {
         @Override
         protected MessageDigest initialValue()
         {
             return newMessageDigest("MD5");
         }
-
-        @Override
-        public MessageDigest get()
-        {
-            MessageDigest digest = super.get();
-            digest.reset();
-            return digest;
-        }
     };
 
     public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
 
     public static MessageDigest threadLocalMD5Digest()
     {
-        return localMD5Digest.get();
+        MessageDigest md = localMD5Digest.get();
+        md.reset();
+        return md;
     }
 
     public static MessageDigest newMessageDigest(String algorithm)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/long/org/apache/cassandra/cql3/ViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/ViewLongTest.java b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
index a5d17ea..590f148 100644
--- a/test/long/org/apache/cassandra/cql3/ViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
@@ -33,6 +33,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.WriteTimeoutException;
 import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -90,7 +91,7 @@ public class ViewLongTest extends CQLTester
         for (int i = 0; i < writers; i++)
         {
             final int writer = i;
-            Thread t = new Thread(new WrappedRunnable()
+            Thread t = NamedThreadFactory.createThread(new WrappedRunnable()
             {
                 public void runMayThrow()
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
index d0b4442..5644e4f 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
@@ -18,14 +18,11 @@
 
 package org.apache.cassandra.test.microbench;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.util.concurrent.FastThreadLocalThread;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 /**
  * Created to test perf of FastThreadLocal
@@ -33,64 +30,10 @@ import io.netty.util.concurrent.FastThreadLocalThread;
  * Used in MutationBench via:
  * jvmArgsAppend = {"-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}
  */
-public class FastThreadExecutor extends AbstractExecutorService
+public class FastThreadExecutor extends ThreadPoolExecutor
 {
-    final FastThreadLocalThread thread;
-    final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
-    final CountDownLatch shutdown = new CountDownLatch(1);
-
     public FastThreadExecutor(int size, String name)
     {
-        assert size == 1;
-
-        thread = new FastThreadLocalThread(() -> {
-            Runnable work = null;
-            try
-            {
-                while ((work = queue.take()) != null)
-                    work.run();
-            }
-            catch (InterruptedException e)
-            {
-                shutdown.countDown();
-            }
-        });
-
-        thread.setName(name + "-1");
-        thread.setDaemon(true);
-
-        thread.start();
-    }
-
-
-    public void shutdown()
-    {
-        thread.interrupt();
-    }
-
-    public List<Runnable> shutdownNow()
-    {
-        thread.interrupt();
-        return Collections.emptyList();
-    }
-
-    public boolean isShutdown()
-    {
-        return shutdown.getCount() == 0;
-    }
-
-    public boolean isTerminated()
-    {
-        return shutdown.getCount() == 0;
-    }
-
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
-    {
-        return shutdown.await(timeout, unit);
-    }
-
-    public void execute(Runnable command)
-    {
-        while(!queue.add(command));
+        super(size, size, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory(name, true));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
new file mode 100644
index 0000000..491dc44
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1,jvmArgsAppend = {"-Xmx512M", "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"})
+@Threads(4) // make sure this matches the number of _physical_cores_
+@State(Scope.Benchmark)
+public class FastThreadLocalBench
+{
+    @Param({"2", "4", "8", "12"})
+    private int variables = 2;
+
+    static final int max = 20;
+    static final ThreadLocal[] threadLocals = new ThreadLocal[max];
+    static final FastThreadLocal[] fastThreadLocals = new FastThreadLocal[max];
+    static
+    {
+        for (int i = 0; i < max; i++)
+        {
+            threadLocals[i] = ThreadLocal.withInitial(Object::new);
+            fastThreadLocals[i] = new FastThreadLocal() {
+                protected Object initialValue() throws Exception
+                {
+                    return new Object();
+                }
+            };
+        }
+    }
+
+    @State(Scope.Thread)
+    public static class FastThreadLocalBenchState
+    {
+        public int index;
+    }
+
+    @Benchmark
+    public void baseline(FastThreadLocalBenchState state, Blackhole bh)
+    {
+        if (variables != 2)
+            throw new IllegalArgumentException("skipped");
+
+        bh.consume("foo");
+    }
+
+    @Benchmark
+    public void threadLocal(FastThreadLocalBenchState state, Blackhole bh)
+    {
+        bh.consume(threadLocals[state.index % max].get());
+    }
+
+    @Benchmark
+    public void fastThreadLocal(FastThreadLocalBenchState state, Blackhole bh)
+    {
+        bh.consume(fastThreadLocals[state.index % max].get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index a4173d6..eca124f 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -32,6 +32,7 @@ import java.util.List;
 
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.Pair;
 
@@ -135,7 +136,7 @@ public class CacheProviderTest
         List<Thread> threads = new ArrayList<>(100);
         for (int i = 0; i < 100; i++)
         {
-            Thread thread = new Thread(runnable);
+            Thread thread = NamedThreadFactory.createThread(runnable);
             threads.add(thread);
             thread.start();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index fdc6880..ac2a9c0 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -44,7 +44,7 @@ public class WaitQueueTest
         final AtomicInteger ready = new AtomicInteger();
         Thread[] ts = new Thread[4];
         for (int i = 0 ; i < ts.length ; i++)
-            ts[i] = new Thread(new Runnable()
+            ts[i] = NamedThreadFactory.createThread(new Runnable()
         {
             @Override
             public void run()
@@ -84,7 +84,7 @@ public class WaitQueueTest
         final AtomicBoolean ready = new AtomicBoolean(false);
         final AtomicBoolean condition = new AtomicBoolean(false);
         final AtomicBoolean fail = new AtomicBoolean(false);
-        Thread t = new Thread(new Runnable()
+        Thread t = NamedThreadFactory.createThread(new Runnable()
         {
             @Override
             public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index cbc412d..f5bda4f 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
@@ -156,20 +157,17 @@ public class RecoveryManagerTest
             Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
 
             final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
-            Thread t = new Thread() {
-                @Override
-                public void run()
+            Thread t = NamedThreadFactory.createThread(() ->
+            {
+                try
                 {
-                    try
-                    {
-                        CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
-                    }
-                    catch (Throwable t)
-                    {
-                        err.set(t);
-                    }
+                    CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
                 }
-            };
+                catch (Throwable x)
+                {
+                    err.set(x);
+                }
+            });
             t.start();
             Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
             Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
index 78ea4f4..08f7ec0 100644
--- a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
@@ -28,6 +28,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Mutation;
@@ -114,7 +115,7 @@ public class HintsBufferTest
         // create HINT_THREADS_COUNT, start them, and wait for them to finish
         List<Thread> threads = new ArrayList<>(HINT_THREADS_COUNT);
         for (int i = 0; i < HINT_THREADS_COUNT; i ++)
-            threads.add(new Thread(new Writer(buffer, load, hintSize, i, baseTimestamp)));
+            threads.add(NamedThreadFactory.createThread(new Writer(buffer, load, hintSize, i, baseTimestamp)));
         threads.forEach(java.lang.Thread::start);
         for (Thread thread : threads)
             thread.join();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 9737281..f287912 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
@@ -619,7 +620,7 @@ public class IndexSummaryManagerTest
         // barrier to control when redistribution runs
         final CountDownLatch barrier = new CountDownLatch(1);
 
-        Thread t = new Thread(new Runnable()
+        Thread t = NamedThreadFactory.createThread(new Runnable()
         {
             public void run()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 942ebe9..d1b4092 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -33,6 +33,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -813,7 +814,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                 }
             }
         };
-        Thread t = new Thread(r);
+        Thread t = NamedThreadFactory.createThread(r);
         try
         {
             t.start();
@@ -895,7 +896,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                 }
             }
         };
-        Thread t = new Thread(r);
+        Thread t = NamedThreadFactory.createThread(r);
         try
         {
             t.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 0ef9b9c..701ea0f 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -32,6 +32,7 @@ import org.junit.*;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
@@ -141,23 +142,20 @@ public class RemoveTest
     {
         // start removal in background and send replication confirmations
         final AtomicBoolean success = new AtomicBoolean(false);
-        Thread remover = new Thread()
+        Thread remover = NamedThreadFactory.createThread(() ->
         {
-            public void run()
+            try
             {
-                try
-                {
-                    ss.removeNode(removalId.toString());
-                }
-                catch (Exception e)
-                {
-                    System.err.println(e);
-                    e.printStackTrace();
-                    return;
-                }
-                success.set(true);
+                ss.removeNode(removalId.toString());
             }
-        };
+            catch (Exception e)
+            {
+                System.err.println(e);
+                e.printStackTrace();
+                return;
+            }
+            success.set(true);
+        });
         remover.start();
 
         Thread.sleep(1000); // make sure removal is waiting for confirmation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
index bb6e3a8..42aef0c 100644
--- a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
+++ b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 import com.clearspring.analytics.hash.MurmurHash;
 import com.clearspring.analytics.stream.Counter;
 import junit.framework.Assert;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.utils.TopKSampler.SamplerResult;
 
 public class TopKSamplerTest
@@ -82,7 +83,7 @@ public class TopKSamplerTest
         final CountDownLatch latch = new CountDownLatch(1);
         final TopKSampler<String> sampler = new TopKSampler<String>();
 
-        new Thread(new Runnable()
+        NamedThreadFactory.createThread(new Runnable()
         {
             public void run()
             {
@@ -99,7 +100,7 @@ public class TopKSamplerTest
             }
 
         }
-        ,"inserter").start();
+        , "inserter").start();
         try
         {
             // start/stop in fast iterations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/tools/stress/src/org/apache/cassandra/stress/StressServer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
index 793f8f0..c00fb54 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
@@ -23,9 +23,11 @@ import java.io.PrintStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli.*;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.MultiResultLogger;
 import org.apache.cassandra.stress.util.ResultLogger;
@@ -39,6 +41,8 @@ public class StressServer
         availableOptions.addOption("h", "host", true, "Host to listen for connections.");
     }
 
+    private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
     public static void main(String[] args) throws Exception
     {
         ServerSocket serverSocket = null;
@@ -93,7 +97,7 @@ public class StressServer
                 ResultLogger log = new MultiResultLogger(out);
 
                 StressAction action = new StressAction((StressSettings) in.readObject(), log);
-                Thread actionThread = new Thread(action);
+                Thread actionThread = NamedThreadFactory.createThread(action, "stress-" + threadCounter.incrementAndGet());
                 actionThread.start();
 
                 while (actionThread.isAlive())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
index f561f61..ff8b27f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -35,6 +35,7 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.TableMetadata;
 import com.datastax.driver.core.Token;
 import com.datastax.driver.core.TokenRange;
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.StressYaml;
 import org.apache.cassandra.stress.WorkManager;
@@ -46,7 +47,7 @@ import org.apache.cassandra.stress.util.ThriftClient;
 
 public class TokenRangeQuery extends Operation
 {
-    private final ThreadLocal<State> currentState = new ThreadLocal<>();
+    private final FastThreadLocal<State> currentState = new FastThreadLocal<>();
 
     private final TableMetadata tableMetadata;
     private final TokenRangeIterator tokenRangeIterator;


[3/8] cassandra git commit: system_distributed should not use gcgs of 0

Posted by al...@apache.org.
system_distributed should not use gcgs of 0

Patch by Jeff Jirsa; Reviewed by  Aleksey Yeschenko for CASSANDRA-12954


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fd4e870
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fd4e870
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fd4e870

Branch: refs/heads/trunk
Commit: 1fd4e870c58931e20a214492aa094e056c6d5714
Parents: edf446c
Author: Jeff Jirsa <je...@crowdstrike.com>
Authored: Sat Dec 3 20:50:29 2016 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:57:15 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                                      | 1 +
 .../org/apache/cassandra/repair/SystemDistributedKeyspace.java   | 4 +++-
 2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fd4e870/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4d77c9a..7d5057a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 3.11.0
  * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
  * nodetool stopdaemon errors out (CASSANDRA-13030)
+ * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
 Merged from 3.0:
  * Fix handling of partition with partition-level deletion plus
    live rows in sstabledump (CASSANDRA-13177)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fd4e870/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 552f318..638cf38 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -116,7 +117,8 @@ public final class SystemDistributedKeyspace
     private static CFMetaData compile(String name, String description, String schema)
     {
         return CFMetaData.compile(String.format(schema, name), SchemaConstants.DISTRIBUTED_KEYSPACE_NAME)
-                         .comment(description);
+                         .comment(description)
+                         .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(10));
     }
 
     public static KeyspaceMetadata metadata()


[5/8] cassandra git commit: NoReplicationTokenAllocator should support zero replication factor

Posted by al...@apache.org.
NoReplicationTokenAllocator should support zero replication factor

Patch by Dikang Gu; reviewed by Branimir Lambov for CASSANDRA-12983


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67cda763
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67cda763
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67cda763

Branch: refs/heads/trunk
Commit: 67cda763b270a56bf5a7c73c6b30c3f9d651daab
Parents: a2dffc2
Author: Dikang Gu <di...@gmail.com>
Authored: Thu Dec 1 18:02:29 2016 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:59:08 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                               |  1 +
 .../dht/tokenallocator/TokenAllocatorFactory.java         | 10 +++++++++-
 .../tokenallocator/NoReplicationTokenAllocatorTest.java   |  9 +++++++++
 3 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/67cda763/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d81d472..1851e62 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * nodetool stopdaemon errors out (CASSANDRA-13030)
  * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
  * Fix primary index calculation for SASI (CASSANDRA-12910)
+ * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
 Merged from 3.0:
  * Fix handling of partition with partition-level deletion plus
    live rows in sstabledump (CASSANDRA-13177)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67cda763/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
index f8c972d..d20de8f 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
@@ -21,17 +21,25 @@ package org.apache.cassandra.dht.tokenallocator;
 import java.net.InetAddress;
 import java.util.NavigableMap;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 
 public class TokenAllocatorFactory
 {
+    private static final Logger logger = LoggerFactory.getLogger(TokenAllocatorFactory.class);
     public static TokenAllocator<InetAddress> createTokenAllocator(NavigableMap<Token, InetAddress> sortedTokens,
                                                      ReplicationStrategy<InetAddress> strategy,
                                                      IPartitioner partitioner)
     {
-        if(strategy.replicas() == 1)
+        if(strategy.replicas() == 1 || strategy.replicas() == 0)
+        {
+            logger.info("Using NoReplicationTokenAllocator.");
             return new NoReplicationTokenAllocator<>(sortedTokens, strategy, partitioner);
+        }
+        logger.info("Using ReplicationAwareTokenAllocator.");
         return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/67cda763/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
index fdcc6b8..c53f788 100644
--- a/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
+++ b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
@@ -55,6 +55,7 @@ public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
         for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4)
         {
             testNewCluster(perUnitCount, fixedTokenCount, new NoReplicationStrategy(), partitioner);
+            testNewCluster(perUnitCount, fixedTokenCount, new ZeroReplicationStrategy(), partitioner);
         }
     }
 
@@ -87,6 +88,7 @@ public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
         for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4)
         {
             testExistingCluster(perUnitCount, fixedTokenCount, new NoReplicationStrategy(), partitioner);
+            testExistingCluster(perUnitCount, fixedTokenCount, new ZeroReplicationStrategy(), partitioner);
         }
     }
 
@@ -246,4 +248,11 @@ public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
         }
     }
 
+    static class ZeroReplicationStrategy extends NoReplicationStrategy
+    {
+        public int replicas()
+        {
+            return 0;
+        }
+    }
 }


[4/8] cassandra git commit: Fix primary index calculation for SASI

Posted by al...@apache.org.
Fix primary index calculation for SASI

Patch by Corentin Chary; reviewed by Alex Petrov for CASSANDRA-12910.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2dffc2b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2dffc2b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2dffc2b

Branch: refs/heads/trunk
Commit: a2dffc2b49e0471ed6c351121fb02a6a0f61b51e
Parents: 1fd4e87
Author: Corentin Chary <co...@gmail.com>
Authored: Tue Dec 6 14:59:25 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:58:30 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                                        | 1 +
 src/java/org/apache/cassandra/index/sasi/plan/QueryController.java | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2dffc2b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d5057a..d81d472 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,7 @@
  * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
  * nodetool stopdaemon errors out (CASSANDRA-13030)
  * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
+ * Fix primary index calculation for SASI (CASSANDRA-12910)
 Merged from 3.0:
  * Fix handling of partition with partition-level deletion plus
    live rows in sstabledump (CASSANDRA-13177)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2dffc2b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index fa1181f..155cd4f 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -236,7 +236,7 @@ public class QueryController
                 continue;
 
             Set<SSTableIndex> indexes = applyScope(view.match(e));
-            if (primaryIndexes.size() > indexes.size())
+            if (expression == null || primaryIndexes.size() > indexes.size())
             {
                 primaryIndexes = indexes;
                 expression = e;


[7/8] cassandra git commit: Remove redundant isLive() check on hint write path

Posted by al...@apache.org.
Remove redundant isLive() check on hint write path

patch by Aleksey Yeschenko; reviewed by Jason Brown for CASSANDRA-12998


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8fc72a5e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8fc72a5e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8fc72a5e

Branch: refs/heads/trunk
Commit: 8fc72a5ebc397e04468ba2de83f06031f038b204
Parents: b6e83fc
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Dec 7 20:55:44 2016 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 17:01:35 2017 +0000

----------------------------------------------------------------------
 src/java/org/apache/cassandra/hints/HintsService.java | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8fc72a5e/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 1a3a403..f8b8e2d 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -159,8 +159,7 @@ public final class HintsService implements HintsServiceMBean
         // we have to make sure that the HintsStore instances get properly initialized - otherwise dispatch will not trigger
         catalog.maybeLoadStores(hostIds);
 
-        if (hint.isLive())
-            bufferPool.write(hostIds, hint);
+        bufferPool.write(hostIds, hint);
 
         StorageMetrics.totalHints.inc(size(hostIds));
     }


[2/8] cassandra git commit: nodetool stopdaemon errors out

Posted by al...@apache.org.
nodetool stopdaemon errors out

patch by Corentin Chary; reviewed by jasobrown for CASSANDRA-13030


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/edf446c8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/edf446c8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/edf446c8

Branch: refs/heads/trunk
Commit: edf446c8c898af6a60c69942e2d6fdbdb523c905
Parents: cecbe17
Author: Corentin Chary <c....@criteo.com>
Authored: Sun Dec 11 10:34:21 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:56:25 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                                  | 1 +
 src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java | 2 ++
 2 files changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/edf446c8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c1cd82..4d77c9a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 3.11.0
  * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
+ * nodetool stopdaemon errors out (CASSANDRA-13030)
 Merged from 3.0:
  * Fix handling of partition with partition-level deletion plus
    live rows in sstabledump (CASSANDRA-13177)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/edf446c8/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java b/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java
index 79a499a..24c8920 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools.nodetool;
 
 import io.airlift.command.Command;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -31,6 +32,7 @@ public class StopDaemon extends NodeToolCmd
     {
         try
         {
+            DatabaseDescriptor.toolInitialization();
             probe.stopCassandraDaemon();
         } catch (Exception e)
         {


[8/8] cassandra git commit: Merge branch 'cassandra-3.11' into trunk

Posted by al...@apache.org.
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45c92ba9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45c92ba9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45c92ba9

Branch: refs/heads/trunk
Commit: 45c92ba91b5b113ca6da4bb438112a94a8030232
Parents: ded6b70 8fc72a5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Feb 5 17:03:07 2017 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 17:08:53 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c92ba9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2076c8f,65efebc..ee9c14e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,10 +1,38 @@@
 +4.0
 + * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
 + * Refactor ColumnCondition (CASSANDRA-12981)
 + * Parallelize streaming of different keyspaces (CASSANDRA-4663)
 + * Improved compactions metrics (CASSANDRA-13015)
 + * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
 + * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
 + * Thrift removal (CASSANDRA-11115)
 + * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
 + * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
 + * Add (automate) Nodetool Documentation (CASSANDRA-12672)
 + * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
 + * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
 + * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
 + * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
 + * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
-  * More fixes to the TokenAllocator (CASSANDRA-12990)
 + * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
 + * Allow IN restrictions on column families with collections (CASSANDRA-12654)
-  * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
-  * nodetool stopdaemon errors out (CASSANDRA-13030)
 + * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
 + * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
 + * Add mutation size and batch metrics (CASSANDRA-12649)
 + * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
-  * Fix primary index calculation for SASI (CASSANDRA-12910)
 + * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
 + * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
-  * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
 + * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
 + * Add support for arithmetic operators (CASSANDRA-11935)
++
++
+ 3.11.0
+  * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
+  * nodetool stopdaemon errors out (CASSANDRA-13030)
   * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
+  * Fix primary index calculation for SASI (CASSANDRA-12910)
+  * More fixes to the TokenAllocator (CASSANDRA-12990)
+  * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
  Merged from 3.0:
   * Fix handling of partition with partition-level deletion plus
     live rows in sstabledump (CASSANDRA-13177)


[6/8] cassandra git commit: More fixes to the TokenAllocation

Posted by al...@apache.org.
More fixes to the TokenAllocation

patch by Dikang Gu; reviewed by Branimir Lambov for CASSANSRA-12990


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b6e83fc2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b6e83fc2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b6e83fc2

Branch: refs/heads/trunk
Commit: b6e83fc200fa9e4c0e4f26491597188305cddd21
Parents: 67cda76
Author: Dikang Gu <di...@gmail.com>
Authored: Sat Dec 3 17:59:01 2016 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 17:00:23 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dht/tokenallocator/TokenAllocation.java     | 30 ++++++++++++
 .../tokenallocator/TokenAllocatorFactory.java   |  2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java | 45 ++++++++++++++++++
 .../cassandra/service/CassandraDaemon.java      | 48 +-------------------
 5 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1851e62..65efebc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * nodetool stopdaemon errors out (CASSANDRA-13030)
  * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
  * Fix primary index calculation for SASI (CASSANDRA-12910)
+ * More fixes to the TokenAllocator (CASSANDRA-12990)
  * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
 Merged from 3.0:
  * Fix handling of partition with partition-level deletion plus

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index 36824a1..15d7868 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -35,12 +35,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class TokenAllocation
 {
@@ -51,6 +53,9 @@ public class TokenAllocation
                                                    final InetAddress endpoint,
                                                    int numTokens)
     {
+        if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
+            Gossiper.waitToSettle();
+
         TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap();
         StrategyAdapter strategy = getStrategy(tokenMetadataCopy, rs, endpoint);
         Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
@@ -198,6 +203,31 @@ public class TokenAllocation
         final String dc = snitch.getDatacenter(endpoint);
         final int replicas = rs.getReplicationFactor(dc);
 
+        if (replicas == 0 || replicas == 1)
+        {
+            // No replication, each node is treated as separate.
+            return new StrategyAdapter()
+            {
+                @Override
+                public int replicas()
+                {
+                    return 1;
+                }
+
+                @Override
+                public Object getGroup(InetAddress unit)
+                {
+                    return unit;
+                }
+
+                @Override
+                public boolean inAllocationRing(InetAddress other)
+                {
+                    return dc.equals(snitch.getDatacenter(other));
+                }
+            };
+        }
+
         Topology topology = tokenMetadata.getTopology();
         int racks = topology.getDatacenterRacks().get(dc).asMap().size();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
index d20de8f..58acb56 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
@@ -34,7 +34,7 @@ public class TokenAllocatorFactory
                                                      ReplicationStrategy<InetAddress> strategy,
                                                      IPartitioner partitioner)
     {
-        if(strategy.replicas() == 1 || strategy.replicas() == 0)
+        if(strategy.replicas() == 1)
         {
             logger.info("Using NoReplicationTokenAllocator.");
             return new NoReplicationTokenAllocator<>(sortedTokens, strategy, partitioner);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 7f0f85b..ebfd66d 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1614,4 +1614,49 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return null;
     }
 
+    public static void waitToSettle()
+    {
+        int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
+        if (forceAfter == 0)
+        {
+            return;
+        }
+        final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
+        final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
+        final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
+
+        logger.info("Waiting for gossip to settle...");
+        Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
+        int totalPolls = 0;
+        int numOkay = 0;
+        int epSize = Gossiper.instance.getEndpointStates().size();
+        while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+        {
+            Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+            int currentSize = Gossiper.instance.getEndpointStates().size();
+            totalPolls++;
+            if (currentSize == epSize)
+            {
+                logger.debug("Gossip looks settled.");
+                numOkay++;
+            }
+            else
+            {
+                logger.info("Gossip not settled after {} polls.", totalPolls);
+                numOkay = 0;
+            }
+            epSize = currentSize;
+            if (forceAfter > 0 && totalPolls > forceAfter)
+            {
+                logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
+                            totalPolls);
+                break;
+            }
+        }
+        if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+            logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
+        else
+            logger.info("No gossip backlog; proceeding");
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5a97dfe..851330b 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -42,7 +42,6 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -412,7 +411,7 @@ public class CassandraDaemon
         ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
 
         if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
-            waitForGossipToSettle();
+            Gossiper.waitToSettle();
 
         // schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
         // due to scheduling errors or race conditions
@@ -680,51 +679,6 @@ public class CassandraDaemon
         }
     }
 
-    private void waitForGossipToSettle()
-    {
-        int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
-        if (forceAfter == 0)
-        {
-            return;
-        }
-        final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
-        final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
-        final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
-
-        logger.info("Waiting for gossip to settle before accepting client requests...");
-        Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
-        int totalPolls = 0;
-        int numOkay = 0;
-        int epSize = Gossiper.instance.getEndpointStates().size();
-        while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
-        {
-            Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
-            int currentSize = Gossiper.instance.getEndpointStates().size();
-            totalPolls++;
-            if (currentSize == epSize)
-            {
-                logger.debug("Gossip looks settled.");
-                numOkay++;
-            }
-            else
-            {
-                logger.info("Gossip not settled after {} polls.", totalPolls);
-                numOkay = 0;
-            }
-            epSize = currentSize;
-            if (forceAfter > 0 && totalPolls > forceAfter)
-            {
-                logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
-                            totalPolls);
-                break;
-            }
-        }
-        if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
-            logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
-        else
-            logger.info("No gossip backlog; proceeding");
-    }
-
     public static void stop(String[] args)
     {
         instance.deactivate();