You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/02/05 17:09:05 UTC
[1/8] cassandra git commit: Move to FastThreadLocalThread and
FastThreadLocal
Repository: cassandra
Updated Branches:
refs/heads/trunk ded6b7010 -> 45c92ba91
Move to FastThreadLocalThread and FastThreadLocal
patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13034
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cecbe17e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cecbe17e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cecbe17e
Branch: refs/heads/trunk
Commit: cecbe17e3eafc052acc13950494f7dddf026aa54
Parents: f71e7e1
Author: Robert Stupp <sn...@snazy.de>
Authored: Tue Dec 13 17:37:09 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:54:19 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../concurrent/NamedThreadFactory.java | 42 ++++++++-
.../functions/ThreadAwareSecurityManager.java | 3 +-
.../AbstractCommitLogSegmentManager.java | 2 +-
.../db/commitlog/AbstractCommitLogService.java | 2 +-
.../db/compaction/CompactionManager.java | 3 +-
.../apache/cassandra/db/marshal/AsciiType.java | 3 +-
.../hints/EncryptedChecksummedDataInput.java | 3 +-
.../cassandra/index/sasi/TermIterator.java | 5 +-
.../io/compress/DeflateCompressor.java | 11 +--
.../cassandra/net/OutboundTcpConnection.java | 5 +-
.../apache/cassandra/repair/RepairRunnable.java | 6 +-
.../scheduler/RoundRobinScheduler.java | 2 +-
.../cassandra/security/CipherFactory.java | 3 +-
.../cassandra/security/EncryptionUtils.java | 3 +-
.../serializers/TimestampSerializer.java | 7 +-
.../cassandra/service/StorageService.java | 8 +-
.../cassandra/thrift/ThriftSessionManager.java | 4 +-
.../cassandra/utils/CoalescingStrategies.java | 25 +++---
.../org/apache/cassandra/utils/FBUtilities.java | 15 ++--
.../org/apache/cassandra/cql3/ViewLongTest.java | 3 +-
.../test/microbench/FastThreadExecutor.java | 65 +-------------
.../test/microbench/FastThreadLocalBench.java | 92 ++++++++++++++++++++
.../cassandra/cache/CacheProviderTest.java | 3 +-
.../cassandra/concurrent/WaitQueueTest.java | 4 +-
.../cassandra/db/RecoveryManagerTest.java | 22 +++--
.../apache/cassandra/hints/HintsBufferTest.java | 3 +-
.../io/sstable/IndexSummaryManagerTest.java | 3 +-
.../io/sstable/SSTableRewriterTest.java | 5 +-
.../apache/cassandra/service/RemoveTest.java | 26 +++---
.../apache/cassandra/utils/TopKSamplerTest.java | 5 +-
.../apache/cassandra/stress/StressServer.java | 6 +-
.../operations/userdefined/TokenRangeQuery.java | 3 +-
33 files changed, 239 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0836475..7c1cd82 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,10 +1,12 @@
3.11.0
+ * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
Merged from 3.0:
* Fix handling of partition with partition-level deletion plus
live rows in sstabledump (CASSANDRA-13177)
* Provide user workaround when system_schema.columns does not contain entries
for a table that's in system_schema.tables (CASSANDRA-13180)
+
3.10
* Fix secondary index queries regression (CASSANDRA-13013)
* Add duration type to the protocol V5 (CASSANDRA-12850)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 22193c4..5d89f6c 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.annotations.VisibleForTesting;
+
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
@@ -58,9 +60,8 @@ public class NamedThreadFactory implements ThreadFactory
public Thread newThread(Runnable runnable)
{
String name = id + ':' + n.getAndIncrement();
- Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
+ Thread thread = createThread(threadGroup, runnable, name, true);
thread.setPriority(priority);
- thread.setDaemon(true);
if (contextClassLoader != null)
thread.setContextClassLoader(contextClassLoader);
return thread;
@@ -75,11 +76,44 @@ public class NamedThreadFactory implements ThreadFactory
{
return () ->
{
- try {
+ try
+ {
r.run();
- } finally {
+ }
+ finally
+ {
FastThreadLocal.removeAll();
}
};
}
+
+ private static final AtomicInteger threadCounter = new AtomicInteger();
+
+ @VisibleForTesting
+ public static Thread createThread(Runnable runnable)
+ {
+ return createThread(null, runnable, "anonymous-" + threadCounter.incrementAndGet());
+ }
+
+ public static Thread createThread(Runnable runnable, String name)
+ {
+ return createThread(null, runnable, name);
+ }
+
+ public static Thread createThread(Runnable runnable, String name, boolean daemon)
+ {
+ return createThread(null, runnable, name, daemon);
+ }
+
+ public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name)
+ {
+ return createThread(threadGroup, runnable, name, false);
+ }
+
+ public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name, boolean daemon)
+ {
+ Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
+ thread.setDaemon(daemon);
+ return thread;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java b/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
index 676117d..2e4bb4d 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
@@ -36,6 +36,7 @@ import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.TurboFilterList;
import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter;
import ch.qos.logback.classic.turbo.TurboFilter;
+import io.netty.util.concurrent.FastThreadLocal;
/**
* Custom {@link SecurityManager} and {@link Policy} implementation that only performs access checks
@@ -175,7 +176,7 @@ public final class ThreadAwareSecurityManager extends SecurityManager
});
}
- private static final ThreadLocal<Boolean> initializedThread = new ThreadLocal<>();
+ private static final FastThreadLocal<Boolean> initializedThread = new FastThreadLocal<>();
private ThreadAwareSecurityManager()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index eff35f4..0ab941b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -147,7 +147,7 @@ public abstract class AbstractCommitLogSegmentManager
};
shutdown = false;
- managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
+ managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR");
managerThread.start();
// for simplicity, ensure the first segment is allocated before continuing
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 834aa0d..71100a3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -150,7 +150,7 @@ public abstract class AbstractCommitLogService
};
shutdown = false;
- thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
+ thread = NamedThreadFactory.createThread(runnable, name);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f34b8a9..f16c1de 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
@@ -93,7 +94,7 @@ public class CompactionManager implements CompactionManagerMBean
// A thread local that tells us if the current thread is owned by the compaction manager. Used
// by CounterContext to figure out if it should log a warning for invalid counter shards.
- public static final ThreadLocal<Boolean> isCompactionManager = new ThreadLocal<Boolean>()
+ public static final FastThreadLocal<Boolean> isCompactionManager = new FastThreadLocal<Boolean>()
{
@Override
protected Boolean initialValue()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/marshal/AsciiType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AsciiType.java b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
index 089e388..3cd45de 100644
--- a/src/java/org/apache/cassandra/db/marshal/AsciiType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
@@ -23,6 +23,7 @@ import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CharacterCodingException;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.cql3.Constants;
import org.apache.cassandra.cql3.Json;
@@ -40,7 +41,7 @@ public class AsciiType extends AbstractType<String>
AsciiType() {super(ComparisonType.BYTE_ORDER);} // singleton
- private final ThreadLocal<CharsetEncoder> encoder = new ThreadLocal<CharsetEncoder>()
+ private final FastThreadLocal<CharsetEncoder> encoder = new FastThreadLocal<CharsetEncoder>()
{
@Override
protected CharsetEncoder initialValue()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
index b335226..b01161d 100644
--- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -23,6 +23,7 @@ import javax.crypto.Cipher;
import com.google.common.annotations.VisibleForTesting;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.security.EncryptionUtils;
import org.apache.cassandra.hints.CompressedChecksummedDataInput.Position;
import org.apache.cassandra.io.FSReadError;
@@ -31,7 +32,7 @@ import org.apache.cassandra.io.util.ChannelProxy;
public class EncryptedChecksummedDataInput extends ChecksummedDataInput
{
- private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+ private static final FastThreadLocal<ByteBuffer> reusableBuffers = new FastThreadLocal<ByteBuffer>()
{
protected ByteBuffer initialValue()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/index/sasi/TermIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/TermIterator.java b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
index 1ddfcb9..03dea18 100644
--- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
@@ -23,6 +23,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
@@ -42,7 +43,7 @@ public class TermIterator extends RangeIterator<Long, Token>
{
private static final Logger logger = LoggerFactory.getLogger(TermIterator.class);
- private static final ThreadLocal<ExecutorService> SEARCH_EXECUTOR = new ThreadLocal<ExecutorService>()
+ private static final FastThreadLocal<ExecutorService> SEARCH_EXECUTOR = new FastThreadLocal<ExecutorService>()
{
public ExecutorService initialValue()
{
@@ -59,7 +60,7 @@ public class TermIterator extends RangeIterator<Long, Token>
public Thread newThread(Runnable task)
{
- return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
+ return NamedThreadFactory.createThread(task, currentThread + "-SEARCH-" + count.incrementAndGet(), true);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index a2ed65c..8557f5f 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.io.compress;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.schema.CompressionParams;
import java.io.IOException;
@@ -32,7 +33,7 @@ public class DeflateCompressor implements ICompressor
{
public static final DeflateCompressor instance = new DeflateCompressor();
- private static final ThreadLocal<byte[]> threadLocalScratchBuffer = new ThreadLocal<byte[]>()
+ private static final FastThreadLocal<byte[]> threadLocalScratchBuffer = new FastThreadLocal<byte[]>()
{
@Override
protected byte[] initialValue()
@@ -46,8 +47,8 @@ public class DeflateCompressor implements ICompressor
return threadLocalScratchBuffer.get();
}
- private final ThreadLocal<Deflater> deflater;
- private final ThreadLocal<Inflater> inflater;
+ private final FastThreadLocal<Deflater> deflater;
+ private final FastThreadLocal<Inflater> inflater;
public static DeflateCompressor create(Map<String, String> compressionOptions)
{
@@ -57,7 +58,7 @@ public class DeflateCompressor implements ICompressor
private DeflateCompressor()
{
- deflater = new ThreadLocal<Deflater>()
+ deflater = new FastThreadLocal<Deflater>()
{
@Override
protected Deflater initialValue()
@@ -65,7 +66,7 @@ public class DeflateCompressor implements ICompressor
return new Deflater();
}
};
- inflater = new ThreadLocal<Inflater>()
+ inflater = new FastThreadLocal<Inflater>()
{
@Override
protected Inflater initialValue()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1843e7b..0693ac3 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -503,7 +503,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
- Runnable r = () ->
+ NamedThreadFactory.createThread(() ->
{
try
{
@@ -523,8 +523,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
//unblock the waiting thread on either success or fail
versionLatch.countDown();
}
- };
- new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start();
+ }, "HANDSHAKE-" + poolReference.endPoint()).start();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 013dcec..c9eed54 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -73,6 +73,8 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
private final List<ProgressListener> listeners = new ArrayList<>();
+ private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
{
this.storageService = storageService;
@@ -376,7 +378,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
private Thread createQueryThread(final int cmd, final UUID sessionId)
{
- return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
+ return NamedThreadFactory.createThread(new WrappedRunnable()
{
// Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
// Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
@@ -443,6 +445,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
seen[si].clear();
}
}
- }));
+ }, "Repair-Runnable-" + threadCounter.incrementAndGet());
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index 904deb3..fd967f3 100644
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@ -67,7 +67,7 @@ public class RoundRobinScheduler implements IRequestScheduler
schedule();
}
};
- Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
+ Thread scheduler = NamedThreadFactory.createThread(runnable, "REQUEST-SCHEDULER");
scheduler.start();
logger.info("Started the RoundRobin Request Scheduler");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/security/CipherFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java b/src/java/org/apache/cassandra/security/CipherFactory.java
index 7c1495a..3f5c5f3 100644
--- a/src/java/org/apache/cassandra/security/CipherFactory.java
+++ b/src/java/org/apache/cassandra/security/CipherFactory.java
@@ -39,6 +39,7 @@ import com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.config.TransparentDataEncryptionOptions;
/**
@@ -54,7 +55,7 @@ public class CipherFactory
* Bonus points if you can avoid calling (@code Cipher#init); hence, the point of the supporting struct
* for caching Cipher instances.
*/
- private static final ThreadLocal<CachedCipher> cipherThreadLocal = new ThreadLocal<>();
+ private static final FastThreadLocal<CachedCipher> cipherThreadLocal = new FastThreadLocal<>();
private final SecureRandom secureRandom;
private final LoadingCache<String, Key> cache;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java
index bb61260..855e2a9 100644
--- a/src/java/org/apache/cassandra/security/EncryptionUtils.java
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -28,6 +28,7 @@ import javax.crypto.ShortBufferException;
import com.google.common.base.Preconditions;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.db.commitlog.EncryptedSegment;
import org.apache.cassandra.io.compress.ICompressor;
import org.apache.cassandra.io.util.ChannelProxy;
@@ -45,7 +46,7 @@ public class EncryptionUtils
public static final int COMPRESSED_BLOCK_HEADER_SIZE = 4;
public static final int ENCRYPTED_BLOCK_HEADER_SIZE = 8;
- private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+ private static final FastThreadLocal<ByteBuffer> reusableBuffers = new FastThreadLocal<ByteBuffer>()
{
protected ByteBuffer initialValue()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
index a4a6f80..ac75d4b 100644
--- a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.serializers;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.utils.ByteBufferUtil;
import java.nio.ByteBuffer;
@@ -90,7 +91,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
private static final String DEFAULT_FORMAT = dateStringPatterns[6];
private static final Pattern timestampPattern = Pattern.compile("^-?\\d+$");
- private static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>()
+ private static final FastThreadLocal<SimpleDateFormat> FORMATTER = new FastThreadLocal<SimpleDateFormat>()
{
protected SimpleDateFormat initialValue()
{
@@ -99,7 +100,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
};
private static final String UTC_FORMAT = dateStringPatterns[40];
- private static final ThreadLocal<SimpleDateFormat> FORMATTER_UTC = new ThreadLocal<SimpleDateFormat>()
+ private static final FastThreadLocal<SimpleDateFormat> FORMATTER_UTC = new FastThreadLocal<SimpleDateFormat>()
{
protected SimpleDateFormat initialValue()
{
@@ -110,7 +111,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
};
private static final String TO_JSON_FORMAT = dateStringPatterns[19];
- private static final ThreadLocal<SimpleDateFormat> FORMATTER_TO_JSON = new ThreadLocal<SimpleDateFormat>()
+ private static final FastThreadLocal<SimpleDateFormat> FORMATTER_TO_JSON = new FastThreadLocal<SimpleDateFormat>()
{
protected SimpleDateFormat initialValue()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 720269f..b64cf13 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -126,6 +126,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
@Deprecated
private final LegacyJMXProgressSupport legacyProgressSupport;
+ private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
private static int getRingDelay()
{
String newdelay = System.getProperty("cassandra.ring_delay_ms");
@@ -634,7 +636,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
- drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
+ drainOnShutdown = NamedThreadFactory.createThread(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@ -649,7 +651,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
logbackHook.run();
}
- }), "StorageServiceShutdownHook");
+ }, "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
replacing = isReplacing();
@@ -3552,7 +3554,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return 0;
int cmd = nextRepairCommand.incrementAndGet();
- new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start();
+ NamedThreadFactory.createThread(createRepairTask(cmd, keyspace, options, legacy), "Repair-Task-" + threadCounter.incrementAndGet()).start();
return cmd;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
index 7d22507..60da3b4 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.util.concurrent.FastThreadLocal;
+
/**
* Encapsulates the current client state (session).
*
@@ -35,7 +37,7 @@ public class ThriftSessionManager
private static final Logger logger = LoggerFactory.getLogger(ThriftSessionManager.class);
public final static ThriftSessionManager instance = new ThriftSessionManager();
- private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<>();
+ private final FastThreadLocal<SocketAddress> remoteSocket = new FastThreadLocal<>();
private final ConcurrentHashMap<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<>();
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 16a64df..0aa980f 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.utils;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.io.util.FileUtils;
import org.slf4j.Logger;
@@ -127,25 +128,21 @@ public class CoalescingStrategies
this.displayName = displayName;
if (DEBUG_COALESCING)
{
- new Thread(displayName + " debug thread")
+ NamedThreadFactory.createThread(() ->
{
- @Override
- public void run()
+ while (true)
{
- while (true)
+ try
{
- try
- {
- Thread.sleep(5000);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError();
- }
- shouldLogAverage = true;
+ Thread.sleep(5000);
}
+ catch (InterruptedException e)
+ {
+ throw new AssertionError();
+ }
+ shouldLogAverage = true;
}
- }.start();
+ }, displayName + " debug thread").start();
}
RandomAccessFile rasTemp = null;
ByteBuffer logBufferTemp = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index a925c0e..2138ea5 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.auth.IAuthorizer;
import org.apache.cassandra.auth.IRoleManager;
@@ -87,28 +88,22 @@ public class FBUtilities
return Runtime.getRuntime().availableProcessors();
}
- private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>()
+ private static final FastThreadLocal<MessageDigest> localMD5Digest = new FastThreadLocal<MessageDigest>()
{
@Override
protected MessageDigest initialValue()
{
return newMessageDigest("MD5");
}
-
- @Override
- public MessageDigest get()
- {
- MessageDigest digest = super.get();
- digest.reset();
- return digest;
- }
};
public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
public static MessageDigest threadLocalMD5Digest()
{
- return localMD5Digest.get();
+ MessageDigest md = localMD5Digest.get();
+ md.reset();
+ return md;
}
public static MessageDigest newMessageDigest(String algorithm)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/long/org/apache/cassandra/cql3/ViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/ViewLongTest.java b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
index a5d17ea..590f148 100644
--- a/test/long/org/apache/cassandra/cql3/ViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
@@ -33,6 +33,7 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.WriteTimeoutException;
import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -90,7 +91,7 @@ public class ViewLongTest extends CQLTester
for (int i = 0; i < writers; i++)
{
final int writer = i;
- Thread t = new Thread(new WrappedRunnable()
+ Thread t = NamedThreadFactory.createThread(new WrappedRunnable()
{
public void runMayThrow()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
index d0b4442..5644e4f 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
@@ -18,14 +18,11 @@
package org.apache.cassandra.test.microbench;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import io.netty.util.concurrent.FastThreadLocalThread;
+import io.netty.util.concurrent.DefaultThreadFactory;
/**
* Created to test perf of FastThreadLocal
@@ -33,64 +30,10 @@ import io.netty.util.concurrent.FastThreadLocalThread;
* Used in MutationBench via:
* jvmArgsAppend = {"-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}
*/
-public class FastThreadExecutor extends AbstractExecutorService
+public class FastThreadExecutor extends ThreadPoolExecutor
{
- final FastThreadLocalThread thread;
- final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
- final CountDownLatch shutdown = new CountDownLatch(1);
-
public FastThreadExecutor(int size, String name)
{
- assert size == 1;
-
- thread = new FastThreadLocalThread(() -> {
- Runnable work = null;
- try
- {
- while ((work = queue.take()) != null)
- work.run();
- }
- catch (InterruptedException e)
- {
- shutdown.countDown();
- }
- });
-
- thread.setName(name + "-1");
- thread.setDaemon(true);
-
- thread.start();
- }
-
-
- public void shutdown()
- {
- thread.interrupt();
- }
-
- public List<Runnable> shutdownNow()
- {
- thread.interrupt();
- return Collections.emptyList();
- }
-
- public boolean isShutdown()
- {
- return shutdown.getCount() == 0;
- }
-
- public boolean isTerminated()
- {
- return shutdown.getCount() == 0;
- }
-
- public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
- {
- return shutdown.await(timeout, unit);
- }
-
- public void execute(Runnable command)
- {
- while(!queue.add(command));
+ super(size, size, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory(name, true));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
new file mode 100644
index 0000000..491dc44
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1,jvmArgsAppend = {"-Xmx512M", "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"})
+@Threads(4) // make sure this matches the number of _physical_cores_
+@State(Scope.Benchmark)
+public class FastThreadLocalBench
+{
+ @Param({"2", "4", "8", "12"})
+ private int variables = 2;
+
+ static final int max = 20;
+ static final ThreadLocal[] threadLocals = new ThreadLocal[max];
+ static final FastThreadLocal[] fastThreadLocals = new FastThreadLocal[max];
+ static
+ {
+ for (int i = 0; i < max; i++)
+ {
+ threadLocals[i] = ThreadLocal.withInitial(Object::new);
+ fastThreadLocals[i] = new FastThreadLocal() {
+ protected Object initialValue() throws Exception
+ {
+ return new Object();
+ }
+ };
+ }
+ }
+
+ @State(Scope.Thread)
+ public static class FastThreadLocalBenchState
+ {
+ public int index;
+ }
+
+ @Benchmark
+ public void baseline(FastThreadLocalBenchState state, Blackhole bh)
+ {
+ if (variables != 2)
+ throw new IllegalArgumentException("skipped");
+
+ bh.consume("foo");
+ }
+
+ @Benchmark
+ public void threadLocal(FastThreadLocalBenchState state, Blackhole bh)
+ {
+ bh.consume(threadLocals[state.index % max].get());
+ }
+
+ @Benchmark
+ public void fastThreadLocal(FastThreadLocalBenchState state, Blackhole bh)
+ {
+ bh.consume(fastThreadLocals[state.index % max].get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index a4173d6..eca124f 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -32,6 +32,7 @@ import java.util.List;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.Pair;
@@ -135,7 +136,7 @@ public class CacheProviderTest
List<Thread> threads = new ArrayList<>(100);
for (int i = 0; i < 100; i++)
{
- Thread thread = new Thread(runnable);
+ Thread thread = NamedThreadFactory.createThread(runnable);
threads.add(thread);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index fdc6880..ac2a9c0 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -44,7 +44,7 @@ public class WaitQueueTest
final AtomicInteger ready = new AtomicInteger();
Thread[] ts = new Thread[4];
for (int i = 0 ; i < ts.length ; i++)
- ts[i] = new Thread(new Runnable()
+ ts[i] = NamedThreadFactory.createThread(new Runnable()
{
@Override
public void run()
@@ -84,7 +84,7 @@ public class WaitQueueTest
final AtomicBoolean ready = new AtomicBoolean(false);
final AtomicBoolean condition = new AtomicBoolean(false);
final AtomicBoolean fail = new AtomicBoolean(false);
- Thread t = new Thread(new Runnable()
+ Thread t = NamedThreadFactory.createThread(new Runnable()
{
@Override
public void run()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index cbc412d..f5bda4f 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.ParameterizedClass;
@@ -156,20 +157,17 @@ public class RecoveryManagerTest
Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
- Thread t = new Thread() {
- @Override
- public void run()
+ Thread t = NamedThreadFactory.createThread(() ->
+ {
+ try
{
- try
- {
- CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
- }
- catch (Throwable t)
- {
- err.set(t);
- }
+ CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
}
- };
+ catch (Throwable x)
+ {
+ err.set(x);
+ }
+ });
t.start();
Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
Thread.sleep(100);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
index 78ea4f4..08f7ec0 100644
--- a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
@@ -28,6 +28,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.Mutation;
@@ -114,7 +115,7 @@ public class HintsBufferTest
// create HINT_THREADS_COUNT, start them, and wait for them to finish
List<Thread> threads = new ArrayList<>(HINT_THREADS_COUNT);
for (int i = 0; i < HINT_THREADS_COUNT; i ++)
- threads.add(new Thread(new Writer(buffer, load, hintSize, i, baseTimestamp)));
+ threads.add(NamedThreadFactory.createThread(new Writer(buffer, load, hintSize, i, baseTimestamp)));
threads.forEach(java.lang.Thread::start);
for (Thread thread : threads)
thread.join();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 9737281..f287912 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowUpdateBuilder;
@@ -619,7 +620,7 @@ public class IndexSummaryManagerTest
// barrier to control when redistribution runs
final CountDownLatch barrier = new CountDownLatch(1);
- Thread t = new Thread(new Runnable()
+ Thread t = NamedThreadFactory.createThread(new Runnable()
{
public void run()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 942ebe9..d1b4092 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -33,6 +33,7 @@ import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
@@ -813,7 +814,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
}
};
- Thread t = new Thread(r);
+ Thread t = NamedThreadFactory.createThread(r);
try
{
t.start();
@@ -895,7 +896,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
}
}
};
- Thread t = new Thread(r);
+ Thread t = NamedThreadFactory.createThread(r);
try
{
t.start();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 0ef9b9c..701ea0f 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -32,6 +32,7 @@ import org.junit.*;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
@@ -141,23 +142,20 @@ public class RemoveTest
{
// start removal in background and send replication confirmations
final AtomicBoolean success = new AtomicBoolean(false);
- Thread remover = new Thread()
+ Thread remover = NamedThreadFactory.createThread(() ->
{
- public void run()
+ try
{
- try
- {
- ss.removeNode(removalId.toString());
- }
- catch (Exception e)
- {
- System.err.println(e);
- e.printStackTrace();
- return;
- }
- success.set(true);
+ ss.removeNode(removalId.toString());
}
- };
+ catch (Exception e)
+ {
+ System.err.println(e);
+ e.printStackTrace();
+ return;
+ }
+ success.set(true);
+ });
remover.start();
Thread.sleep(1000); // make sure removal is waiting for confirmation
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
index bb6e3a8..42aef0c 100644
--- a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
+++ b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
import com.clearspring.analytics.hash.MurmurHash;
import com.clearspring.analytics.stream.Counter;
import junit.framework.Assert;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.utils.TopKSampler.SamplerResult;
public class TopKSamplerTest
@@ -82,7 +83,7 @@ public class TopKSamplerTest
final CountDownLatch latch = new CountDownLatch(1);
final TopKSampler<String> sampler = new TopKSampler<String>();
- new Thread(new Runnable()
+ NamedThreadFactory.createThread(new Runnable()
{
public void run()
{
@@ -99,7 +100,7 @@ public class TopKSamplerTest
}
}
- ,"inserter").start();
+ , "inserter").start();
try
{
// start/stop in fast iterations
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/tools/stress/src/org/apache/cassandra/stress/StressServer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
index 793f8f0..c00fb54 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
@@ -23,9 +23,11 @@ import java.io.PrintStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.cli.*;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.MultiResultLogger;
import org.apache.cassandra.stress.util.ResultLogger;
@@ -39,6 +41,8 @@ public class StressServer
availableOptions.addOption("h", "host", true, "Host to listen for connections.");
}
+ private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
public static void main(String[] args) throws Exception
{
ServerSocket serverSocket = null;
@@ -93,7 +97,7 @@ public class StressServer
ResultLogger log = new MultiResultLogger(out);
StressAction action = new StressAction((StressSettings) in.readObject(), log);
- Thread actionThread = new Thread(action);
+ Thread actionThread = NamedThreadFactory.createThread(action, "stress-" + threadCounter.incrementAndGet());
actionThread.start();
while (actionThread.isAlive())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
index f561f61..ff8b27f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -35,6 +35,7 @@ import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.Token;
import com.datastax.driver.core.TokenRange;
+import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.stress.Operation;
import org.apache.cassandra.stress.StressYaml;
import org.apache.cassandra.stress.WorkManager;
@@ -46,7 +47,7 @@ import org.apache.cassandra.stress.util.ThriftClient;
public class TokenRangeQuery extends Operation
{
- private final ThreadLocal<State> currentState = new ThreadLocal<>();
+ private final FastThreadLocal<State> currentState = new FastThreadLocal<>();
private final TableMetadata tableMetadata;
private final TokenRangeIterator tokenRangeIterator;
[3/8] cassandra git commit: system_distributed should not use gcgs of
0
Posted by al...@apache.org.
system_distributed should not use gcgs of 0
Patch by Jeff Jirsa; Reviewed by Aleksey Yeschenko for CASSANDRA-12954
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fd4e870
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fd4e870
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fd4e870
Branch: refs/heads/trunk
Commit: 1fd4e870c58931e20a214492aa094e056c6d5714
Parents: edf446c
Author: Jeff Jirsa <je...@crowdstrike.com>
Authored: Sat Dec 3 20:50:29 2016 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:57:15 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/repair/SystemDistributedKeyspace.java | 4 +++-
2 files changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fd4e870/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4d77c9a..7d5057a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
3.11.0
* Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
* nodetool stopdaemon errors out (CASSANDRA-13030)
+ * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
Merged from 3.0:
* Fix handling of partition with partition-level deletion plus
live rows in sstabledump (CASSANDRA-13177)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fd4e870/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index 552f318..638cf38 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -116,7 +117,8 @@ public final class SystemDistributedKeyspace
private static CFMetaData compile(String name, String description, String schema)
{
return CFMetaData.compile(String.format(schema, name), SchemaConstants.DISTRIBUTED_KEYSPACE_NAME)
- .comment(description);
+ .comment(description)
+ .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(10));
}
public static KeyspaceMetadata metadata()
[5/8] cassandra git commit: NoReplicationTokenAllocator should
support zero replication factor
Posted by al...@apache.org.
NoReplicationTokenAllocator should support zero replication factor
Patch by Dikang Gu; reviewed by Branimir Lambov for CASSANDRA-12983
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67cda763
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67cda763
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67cda763
Branch: refs/heads/trunk
Commit: 67cda763b270a56bf5a7c73c6b30c3f9d651daab
Parents: a2dffc2
Author: Dikang Gu <di...@gmail.com>
Authored: Thu Dec 1 18:02:29 2016 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:59:08 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dht/tokenallocator/TokenAllocatorFactory.java | 10 +++++++++-
.../tokenallocator/NoReplicationTokenAllocatorTest.java | 9 +++++++++
3 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67cda763/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d81d472..1851e62 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* nodetool stopdaemon errors out (CASSANDRA-13030)
* Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
* Fix primary index calculation for SASI (CASSANDRA-12910)
+ * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
Merged from 3.0:
* Fix handling of partition with partition-level deletion plus
live rows in sstabledump (CASSANDRA-13177)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67cda763/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
index f8c972d..d20de8f 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
@@ -21,17 +21,25 @@ package org.apache.cassandra.dht.tokenallocator;
import java.net.InetAddress;
import java.util.NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
public class TokenAllocatorFactory
{
+ private static final Logger logger = LoggerFactory.getLogger(TokenAllocatorFactory.class);
public static TokenAllocator<InetAddress> createTokenAllocator(NavigableMap<Token, InetAddress> sortedTokens,
ReplicationStrategy<InetAddress> strategy,
IPartitioner partitioner)
{
- if(strategy.replicas() == 1)
+ if(strategy.replicas() == 1 || strategy.replicas() == 0)
+ {
+ logger.info("Using NoReplicationTokenAllocator.");
return new NoReplicationTokenAllocator<>(sortedTokens, strategy, partitioner);
+ }
+ logger.info("Using ReplicationAwareTokenAllocator.");
return new ReplicationAwareTokenAllocator<>(sortedTokens, strategy, partitioner);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/67cda763/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
index fdcc6b8..c53f788 100644
--- a/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
+++ b/test/long/org/apache/cassandra/dht/tokenallocator/NoReplicationTokenAllocatorTest.java
@@ -55,6 +55,7 @@ public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4)
{
testNewCluster(perUnitCount, fixedTokenCount, new NoReplicationStrategy(), partitioner);
+ testNewCluster(perUnitCount, fixedTokenCount, new ZeroReplicationStrategy(), partitioner);
}
}
@@ -87,6 +88,7 @@ public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
for (int perUnitCount = 1; perUnitCount <= MAX_VNODE_COUNT; perUnitCount *= 4)
{
testExistingCluster(perUnitCount, fixedTokenCount, new NoReplicationStrategy(), partitioner);
+ testExistingCluster(perUnitCount, fixedTokenCount, new ZeroReplicationStrategy(), partitioner);
}
}
@@ -246,4 +248,11 @@ public class NoReplicationTokenAllocatorTest extends TokenAllocatorTestBase
}
}
+ static class ZeroReplicationStrategy extends NoReplicationStrategy
+ {
+ public int replicas()
+ {
+ return 0;
+ }
+ }
}
[4/8] cassandra git commit: Fix primary index calculation for SASI
Posted by al...@apache.org.
Fix primary index calculation for SASI
Patch by Corentin Chary; reviewed by Alex Petrov for CASSANDRA-12910.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a2dffc2b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a2dffc2b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a2dffc2b
Branch: refs/heads/trunk
Commit: a2dffc2b49e0471ed6c351121fb02a6a0f61b51e
Parents: 1fd4e87
Author: Corentin Chary <co...@gmail.com>
Authored: Tue Dec 6 14:59:25 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:58:30 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/index/sasi/plan/QueryController.java | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2dffc2b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7d5057a..d81d472 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,7 @@
* Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
* nodetool stopdaemon errors out (CASSANDRA-13030)
* Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
+ * Fix primary index calculation for SASI (CASSANDRA-12910)
Merged from 3.0:
* Fix handling of partition with partition-level deletion plus
live rows in sstabledump (CASSANDRA-13177)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a2dffc2b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
index fa1181f..155cd4f 100644
--- a/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sasi/plan/QueryController.java
@@ -236,7 +236,7 @@ public class QueryController
continue;
Set<SSTableIndex> indexes = applyScope(view.match(e));
- if (primaryIndexes.size() > indexes.size())
+ if (expression == null || primaryIndexes.size() > indexes.size())
{
primaryIndexes = indexes;
expression = e;
[7/8] cassandra git commit: Remove redundant isLive() check on hint
write path
Posted by al...@apache.org.
Remove redundant isLive() check on hint write path
patch by Aleksey Yeschenko; reviewed by Jason Brown for CASSANDRA-12998
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8fc72a5e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8fc72a5e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8fc72a5e
Branch: refs/heads/trunk
Commit: 8fc72a5ebc397e04468ba2de83f06031f038b204
Parents: b6e83fc
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Dec 7 20:55:44 2016 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 17:01:35 2017 +0000
----------------------------------------------------------------------
src/java/org/apache/cassandra/hints/HintsService.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8fc72a5e/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index 1a3a403..f8b8e2d 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -159,8 +159,7 @@ public final class HintsService implements HintsServiceMBean
// we have to make sure that the HintsStore instances get properly initialized - otherwise dispatch will not trigger
catalog.maybeLoadStores(hostIds);
- if (hint.isLive())
- bufferPool.write(hostIds, hint);
+ bufferPool.write(hostIds, hint);
StorageMetrics.totalHints.inc(size(hostIds));
}
[2/8] cassandra git commit: nodetool stopdaemon errors out
Posted by al...@apache.org.
nodetool stopdaemon errors out
patch by Corentin Chary; reviewed by jasobrown for CASSANDRA-13030
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/edf446c8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/edf446c8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/edf446c8
Branch: refs/heads/trunk
Commit: edf446c8c898af6a60c69942e2d6fdbdb523c905
Parents: cecbe17
Author: Corentin Chary <c....@criteo.com>
Authored: Sun Dec 11 10:34:21 2016 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 16:56:25 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java | 2 ++
2 files changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/edf446c8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c1cd82..4d77c9a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
3.11.0
* Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
+ * nodetool stopdaemon errors out (CASSANDRA-13030)
Merged from 3.0:
* Fix handling of partition with partition-level deletion plus
live rows in sstabledump (CASSANDRA-13177)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/edf446c8/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java b/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java
index 79a499a..24c8920 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/StopDaemon.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools.nodetool;
import io.airlift.command.Command;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -31,6 +32,7 @@ public class StopDaemon extends NodeToolCmd
{
try
{
+ DatabaseDescriptor.toolInitialization();
probe.stopCassandraDaemon();
} catch (Exception e)
{
[8/8] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45c92ba9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45c92ba9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45c92ba9
Branch: refs/heads/trunk
Commit: 45c92ba91b5b113ca6da4bb438112a94a8030232
Parents: ded6b70 8fc72a5
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Sun Feb 5 17:03:07 2017 +0000
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 17:08:53 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 13 ++++++++-----
1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c92ba9/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2076c8f,65efebc..ee9c14e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,35 -1,10 +1,38 @@@
+4.0
+ * Make TableMetadata immutable, optimize Schema (CASSANDRA-9425)
+ * Refactor ColumnCondition (CASSANDRA-12981)
+ * Parallelize streaming of different keyspaces (CASSANDRA-4663)
+ * Improved compactions metrics (CASSANDRA-13015)
+ * Speed-up start-up sequence by avoiding un-needed flushes (CASSANDRA-13031)
+ * Use Caffeine (W-TinyLFU) for on-heap caches (CASSANDRA-10855)
+ * Thrift removal (CASSANDRA-11115)
+ * Remove pre-3.0 compatibility code for 4.0 (CASSANDRA-12716)
+ * Add column definition kind to dropped columns in schema (CASSANDRA-12705)
+ * Add (automate) Nodetool Documentation (CASSANDRA-12672)
+ * Update bundled cqlsh python driver to 3.7.0 (CASSANDRA-12736)
+ * Reject invalid replication settings when creating or altering a keyspace (CASSANDRA-12681)
+ * Clean up the SSTableReader#getScanner API wrt removal of RateLimiter (CASSANDRA-12422)
+ * Use new token allocation for non bootstrap case as well (CASSANDRA-13080)
+ * Avoid byte-array copy when key cache is disabled (CASSANDRA-13084)
- * More fixes to the TokenAllocator (CASSANDRA-12990)
+ * Require forceful decommission if number of nodes is less than replication factor (CASSANDRA-12510)
+ * Allow IN restrictions on column families with collections (CASSANDRA-12654)
- * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
- * nodetool stopdaemon errors out (CASSANDRA-13030)
+ * Log message size in trace message in OutboundTcpConnection (CASSANDRA-13028)
+ * Add timeUnit Days for cassandra-stress (CASSANDRA-13029)
+ * Add mutation size and batch metrics (CASSANDRA-12649)
+ * Add method to get size of endpoints to TokenMetadata (CASSANDRA-12999)
- * Fix primary index calculation for SASI (CASSANDRA-12910)
+ * Expose time spent waiting in thread pool queue (CASSANDRA-8398)
+ * Conditionally update index built status to avoid unnecessary flushes (CASSANDRA-12969)
- * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
+ * cqlsh auto completion: refactor definition of compaction strategy options (CASSANDRA-12946)
+ * Add support for arithmetic operators (CASSANDRA-11935)
++
++
+ 3.11.0
+ * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
+ * nodetool stopdaemon errors out (CASSANDRA-13030)
* Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
+ * Fix primary index calculation for SASI (CASSANDRA-12910)
+ * More fixes to the TokenAllocator (CASSANDRA-12990)
+ * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
Merged from 3.0:
* Fix handling of partition with partition-level deletion plus
live rows in sstabledump (CASSANDRA-13177)
[6/8] cassandra git commit: More fixes to the TokenAllocation
Posted by al...@apache.org.
More fixes to the TokenAllocation
patch by Dikang Gu; reviewed by Branimir Lambov for CASSANSRA-12990
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b6e83fc2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b6e83fc2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b6e83fc2
Branch: refs/heads/trunk
Commit: b6e83fc200fa9e4c0e4f26491597188305cddd21
Parents: 67cda76
Author: Dikang Gu <di...@gmail.com>
Authored: Sat Dec 3 17:59:01 2016 -0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Sun Feb 5 17:00:23 2017 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../dht/tokenallocator/TokenAllocation.java | 30 ++++++++++++
.../tokenallocator/TokenAllocatorFactory.java | 2 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 45 ++++++++++++++++++
.../cassandra/service/CassandraDaemon.java | 48 +-------------------
5 files changed, 78 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1851e62..65efebc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* nodetool stopdaemon errors out (CASSANDRA-13030)
* Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
* Fix primary index calculation for SASI (CASSANDRA-12910)
+ * More fixes to the TokenAllocator (CASSANDRA-12990)
* NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983)
Merged from 3.0:
* Fix handling of partition with partition-level deletion plus
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
index 36824a1..15d7868 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocation.java
@@ -35,12 +35,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.utils.FBUtilities;
public class TokenAllocation
{
@@ -51,6 +53,9 @@ public class TokenAllocation
final InetAddress endpoint,
int numTokens)
{
+ if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
+ Gossiper.waitToSettle();
+
TokenMetadata tokenMetadataCopy = tokenMetadata.cloneOnlyTokenMap();
StrategyAdapter strategy = getStrategy(tokenMetadataCopy, rs, endpoint);
Collection<Token> tokens = create(tokenMetadata, strategy).addUnit(endpoint, numTokens);
@@ -198,6 +203,31 @@ public class TokenAllocation
final String dc = snitch.getDatacenter(endpoint);
final int replicas = rs.getReplicationFactor(dc);
+ if (replicas == 0 || replicas == 1)
+ {
+ // No replication, each node is treated as separate.
+ return new StrategyAdapter()
+ {
+ @Override
+ public int replicas()
+ {
+ return 1;
+ }
+
+ @Override
+ public Object getGroup(InetAddress unit)
+ {
+ return unit;
+ }
+
+ @Override
+ public boolean inAllocationRing(InetAddress other)
+ {
+ return dc.equals(snitch.getDatacenter(other));
+ }
+ };
+ }
+
Topology topology = tokenMetadata.getTopology();
int racks = topology.getDatacenterRacks().get(dc).asMap().size();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
index d20de8f..58acb56 100644
--- a/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
+++ b/src/java/org/apache/cassandra/dht/tokenallocator/TokenAllocatorFactory.java
@@ -34,7 +34,7 @@ public class TokenAllocatorFactory
ReplicationStrategy<InetAddress> strategy,
IPartitioner partitioner)
{
- if(strategy.replicas() == 1 || strategy.replicas() == 0)
+ if(strategy.replicas() == 1)
{
logger.info("Using NoReplicationTokenAllocator.");
return new NoReplicationTokenAllocator<>(sortedTokens, strategy, partitioner);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 7f0f85b..ebfd66d 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1614,4 +1614,49 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
return null;
}
+ public static void waitToSettle()
+ {
+ int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
+ if (forceAfter == 0)
+ {
+ return;
+ }
+ final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
+ final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
+ final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
+
+ logger.info("Waiting for gossip to settle...");
+ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
+ int totalPolls = 0;
+ int numOkay = 0;
+ int epSize = Gossiper.instance.getEndpointStates().size();
+ while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+ {
+ Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ int currentSize = Gossiper.instance.getEndpointStates().size();
+ totalPolls++;
+ if (currentSize == epSize)
+ {
+ logger.debug("Gossip looks settled.");
+ numOkay++;
+ }
+ else
+ {
+ logger.info("Gossip not settled after {} polls.", totalPolls);
+ numOkay = 0;
+ }
+ epSize = currentSize;
+ if (forceAfter > 0 && totalPolls > forceAfter)
+ {
+ logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
+ totalPolls);
+ break;
+ }
+ }
+ if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
+ logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
+ else
+ logger.info("No gossip backlog; proceeding");
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b6e83fc2/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5a97dfe..851330b 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -42,7 +42,6 @@ import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -412,7 +411,7 @@ public class CassandraDaemon
ScheduledExecutors.optionalTasks.schedule(viewRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
if (!FBUtilities.getBroadcastAddress().equals(InetAddress.getLoopbackAddress()))
- waitForGossipToSettle();
+ Gossiper.waitToSettle();
// schedule periodic background compaction task submission. this is simply a backstop against compactions stalling
// due to scheduling errors or race conditions
@@ -680,51 +679,6 @@ public class CassandraDaemon
}
}
- private void waitForGossipToSettle()
- {
- int forceAfter = Integer.getInteger("cassandra.skip_wait_for_gossip_to_settle", -1);
- if (forceAfter == 0)
- {
- return;
- }
- final int GOSSIP_SETTLE_MIN_WAIT_MS = 5000;
- final int GOSSIP_SETTLE_POLL_INTERVAL_MS = 1000;
- final int GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED = 3;
-
- logger.info("Waiting for gossip to settle before accepting client requests...");
- Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_MIN_WAIT_MS, TimeUnit.MILLISECONDS);
- int totalPolls = 0;
- int numOkay = 0;
- int epSize = Gossiper.instance.getEndpointStates().size();
- while (numOkay < GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
- {
- Uninterruptibles.sleepUninterruptibly(GOSSIP_SETTLE_POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
- int currentSize = Gossiper.instance.getEndpointStates().size();
- totalPolls++;
- if (currentSize == epSize)
- {
- logger.debug("Gossip looks settled.");
- numOkay++;
- }
- else
- {
- logger.info("Gossip not settled after {} polls.", totalPolls);
- numOkay = 0;
- }
- epSize = currentSize;
- if (forceAfter > 0 && totalPolls > forceAfter)
- {
- logger.warn("Gossip not settled but startup forced by cassandra.skip_wait_for_gossip_to_settle. Gossip total polls: {}",
- totalPolls);
- break;
- }
- }
- if (totalPolls > GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED)
- logger.info("Gossip settled after {} extra polls; proceeding", totalPolls - GOSSIP_SETTLE_POLL_SUCCESSES_REQUIRED);
- else
- logger.info("No gossip backlog; proceeding");
- }
-
public static void stop(String[] args)
{
instance.deactivate();