You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2016/12/12 19:48:54 UTC
[01/10] cassandra git commit: Thread local pools never cleaned up
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 da94781a2 -> 7f668c6fe
refs/heads/cassandra-3.11 c80b9fb39 -> 50a9b1abb
refs/heads/cassandra-3.X 8d1c33fa9 -> e30386325
refs/heads/trunk a92ca6154 -> 2d21cbda2
Thread local pools never cleaned up
patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13033
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f668c6f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f668c6f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f668c6f
Branch: refs/heads/cassandra-3.0
Commit: 7f668c6fe117f892cd79863fb9805ea5d5a2823c
Parents: da94781
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:28:31 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:38:56 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 24 +++++++++++-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../db/commitlog/CommitLogSegmentManager.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 41 +++++++++-----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 3 +-
.../cassandra/service/StorageService.java | 7 ++--
.../cassandra/streaming/ConnectionHandler.java | 4 +-
.../compress/CompressedInputStream.java | 3 +-
10 files changed, 59 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbd47c1..5bc30be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Thread local pools never cleaned up (CASSANDRA-13033)
* Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
* CQL often queries static columns unnecessarily (CASSANDRA-12768)
* Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 33c80d5..22193c4 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+
/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
* is useful to give Java threads meaningful names which is useful when using
@@ -54,12 +57,29 @@ public class NamedThreadFactory implements ThreadFactory
public Thread newThread(Runnable runnable)
{
- String name = id + ":" + n.getAndIncrement();
- Thread thread = new Thread(threadGroup, runnable, name);
+ String name = id + ':' + n.getAndIncrement();
+ Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
thread.setPriority(priority);
thread.setDaemon(true);
if (contextClassLoader != null)
thread.setContextClassLoader(contextClassLoader);
return thread;
}
+
+ /**
+ * Ensures that {@link FastThreadLocal#remove() FastThreadLocal.remove()} is called when the {@link Runnable#run()}
+ * method of the given {@link Runnable} instance completes to ensure cleanup of {@link FastThreadLocal} instances.
+ * This is especially important for direct byte buffers allocated locally for a thread.
+ */
+ public static Runnable threadLocalDeallocator(Runnable r)
+ {
+ return () ->
+ {
+ try {
+ r.run();
+ } finally {
+ FastThreadLocal.removeAll();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 113d1ba..e5a5887 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.commitlog;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.*;
@@ -159,7 +160,7 @@ public abstract class AbstractCommitLogService
}
};
- thread = new Thread(runnable, name);
+ thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 82cee50..79dd316 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -174,7 +175,7 @@ public class CommitLogSegmentManager
run = true;
- managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+ managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
managerThread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index f573787..a9dfcdc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -45,6 +45,7 @@ import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -505,31 +506,27 @@ public class OutboundTcpConnection extends Thread
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
- new Thread("HANDSHAKE-" + poolReference.endPoint())
+ new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
{
- @Override
- public void run()
+ try
{
- try
- {
- logger.info("Handshaking version with {}", poolReference.endPoint());
- version.set(inputStream.readInt());
- }
- catch (IOException ex)
- {
- final String msg = "Cannot handshake version with " + poolReference.endPoint();
- if (logger.isTraceEnabled())
- logger.trace(msg, ex);
- else
- logger.info(msg);
- }
- finally
- {
- //unblock the waiting thread on either success or fail
- versionLatch.countDown();
- }
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
}
- }.start();
+ }),"HANDSHAKE-" + poolReference.endPoint()).start();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 7dd1b31..213e5c5 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -373,7 +373,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
private Thread createQueryThread(final int cmd, final UUID sessionId)
{
- return new Thread(new WrappedRunnable()
+ return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
// Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
// Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
@@ -440,6 +440,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
seen[si].clear();
}
}
- });
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe..61dfa50 100644
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.RequestSchedulerOptions;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -69,7 +70,7 @@ public class RoundRobinScheduler implements IRequestScheduler
}
}
};
- Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+ Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
scheduler.start();
logger.info("Started the RoundRobin Request Scheduler");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d70c8dc..71cbc35 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.auth.AuthMigrationListener;
import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -613,7 +614,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
- drainOnShutdown = new Thread(new WrappedRunnable()
+ drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@ -628,7 +629,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
logbackHook.run();
}
- }, "StorageServiceShutdownHook");
+ }), "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
replacing = DatabaseDescriptor.isReplacing();
@@ -3195,7 +3196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return 0;
int cmd = nextRepairCommand.incrementAndGet();
- new Thread(createRepairTask(cmd, keyspace, options, legacy)).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start();
return cmd;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..b83c089 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -214,7 +216,7 @@ public class ConnectionHandler
this.socket = socket;
this.protocolVersion = protocolVersion;
- new Thread(this, name() + "-" + session.peer).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
}
public ListenableFuture<?> close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index d59849f..6577980 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -33,6 +33,7 @@ import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -91,7 +92,7 @@ public class CompressedInputStream extends InputStream
this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
this.crcCheckChanceSupplier = crcCheckChanceSupplier;
- new Thread(new Reader(source, info, dataBuffer)).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(new Reader(source, info, dataBuffer))).start();
}
public int read() throws IOException
[10/10] cassandra git commit: Merge branch 'cassandra-3.X' into trunk
Posted by sn...@apache.org.
Merge branch 'cassandra-3.X' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2d21cbda
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2d21cbda
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2d21cbda
Branch: refs/heads/trunk
Commit: 2d21cbda228940cbcd9c0bc01c6047af2acdb057
Parents: a92ca61 e303863
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:39:58 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:39:58 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 22 +++++++++-
.../AbstractCommitLogSegmentManager.java | 3 +-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../cassandra/index/sasi/TermIterator.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 42 ++++++++++----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 12 +++---
.../cassandra/service/StorageService.java | 7 ++--
9 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d21cbda/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f59f89f,74cab19..d1135bb
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -131,8 -122,9 +131,9 @@@
* Remove pre-startup check for open JMX port (CASSANDRA-12074)
* Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
* Restore resumable hints delivery (CASSANDRA-11960)
- * Properly report LWT contention (CASSANDRA-12626)
+ * Properly record CAS contention (CASSANDRA-12626)
Merged from 3.0:
+ * Thread local pools never cleaned up (CASSANDRA-13033)
* Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
* CQL often queries static columns unnecessarily (CASSANDRA-12768)
* Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d21cbda/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2d21cbda/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
[09/10] cassandra git commit: Merge branch 'cassandra-3.11' into
cassandra-3.X
Posted by sn...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3038632
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3038632
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3038632
Branch: refs/heads/trunk
Commit: e3038632580314dfbad057d1d709381636798c84
Parents: 8d1c33f 50a9b1a
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:39:55 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:39:55 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 22 +++++++++-
.../AbstractCommitLogSegmentManager.java | 3 +-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../cassandra/index/sasi/TermIterator.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 42 ++++++++++----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 12 +++---
.../cassandra/service/StorageService.java | 7 ++--
9 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3038632/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3038632/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3038632/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
[03/10] cassandra git commit: Thread local pools never cleaned up
Posted by sn...@apache.org.
Thread local pools never cleaned up
patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13033
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f668c6f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f668c6f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f668c6f
Branch: refs/heads/cassandra-3.X
Commit: 7f668c6fe117f892cd79863fb9805ea5d5a2823c
Parents: da94781
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:28:31 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:38:56 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 24 +++++++++++-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../db/commitlog/CommitLogSegmentManager.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 41 +++++++++-----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 3 +-
.../cassandra/service/StorageService.java | 7 ++--
.../cassandra/streaming/ConnectionHandler.java | 4 +-
.../compress/CompressedInputStream.java | 3 +-
10 files changed, 59 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbd47c1..5bc30be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Thread local pools never cleaned up (CASSANDRA-13033)
* Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
* CQL often queries static columns unnecessarily (CASSANDRA-12768)
* Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 33c80d5..22193c4 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+
/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
* is useful to give Java threads meaningful names which is useful when using
@@ -54,12 +57,29 @@ public class NamedThreadFactory implements ThreadFactory
public Thread newThread(Runnable runnable)
{
- String name = id + ":" + n.getAndIncrement();
- Thread thread = new Thread(threadGroup, runnable, name);
+ String name = id + ':' + n.getAndIncrement();
+ Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
thread.setPriority(priority);
thread.setDaemon(true);
if (contextClassLoader != null)
thread.setContextClassLoader(contextClassLoader);
return thread;
}
+
+ /**
+ * Ensures that {@link FastThreadLocal#remove() FastThreadLocal.remove()} is called when the {@link Runnable#run()}
+ * method of the given {@link Runnable} instance completes to ensure cleanup of {@link FastThreadLocal} instances.
+ * This is especially important for direct byte buffers allocated locally for a thread.
+ */
+ public static Runnable threadLocalDeallocator(Runnable r)
+ {
+ return () ->
+ {
+ try {
+ r.run();
+ } finally {
+ FastThreadLocal.removeAll();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 113d1ba..e5a5887 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.commitlog;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.*;
@@ -159,7 +160,7 @@ public abstract class AbstractCommitLogService
}
};
- thread = new Thread(runnable, name);
+ thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 82cee50..79dd316 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -174,7 +175,7 @@ public class CommitLogSegmentManager
run = true;
- managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+ managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
managerThread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index f573787..a9dfcdc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -45,6 +45,7 @@ import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -505,31 +506,27 @@ public class OutboundTcpConnection extends Thread
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
- new Thread("HANDSHAKE-" + poolReference.endPoint())
+ new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
{
- @Override
- public void run()
+ try
{
- try
- {
- logger.info("Handshaking version with {}", poolReference.endPoint());
- version.set(inputStream.readInt());
- }
- catch (IOException ex)
- {
- final String msg = "Cannot handshake version with " + poolReference.endPoint();
- if (logger.isTraceEnabled())
- logger.trace(msg, ex);
- else
- logger.info(msg);
- }
- finally
- {
- //unblock the waiting thread on either success or fail
- versionLatch.countDown();
- }
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
}
- }.start();
+ }),"HANDSHAKE-" + poolReference.endPoint()).start();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 7dd1b31..213e5c5 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -373,7 +373,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
private Thread createQueryThread(final int cmd, final UUID sessionId)
{
- return new Thread(new WrappedRunnable()
+ return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
// Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
// Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
@@ -440,6 +440,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
seen[si].clear();
}
}
- });
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe..61dfa50 100644
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.RequestSchedulerOptions;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -69,7 +70,7 @@ public class RoundRobinScheduler implements IRequestScheduler
}
}
};
- Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+ Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
scheduler.start();
logger.info("Started the RoundRobin Request Scheduler");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d70c8dc..71cbc35 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.auth.AuthMigrationListener;
import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -613,7 +614,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
- drainOnShutdown = new Thread(new WrappedRunnable()
+ drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@ -628,7 +629,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
logbackHook.run();
}
- }, "StorageServiceShutdownHook");
+ }), "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
replacing = DatabaseDescriptor.isReplacing();
@@ -3195,7 +3196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return 0;
int cmd = nextRepairCommand.incrementAndGet();
- new Thread(createRepairTask(cmd, keyspace, options, legacy)).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start();
return cmd;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..b83c089 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -214,7 +216,7 @@ public class ConnectionHandler
this.socket = socket;
this.protocolVersion = protocolVersion;
- new Thread(this, name() + "-" + session.peer).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
}
public ListenableFuture<?> close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index d59849f..6577980 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -33,6 +33,7 @@ import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -91,7 +92,7 @@ public class CompressedInputStream extends InputStream
this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
this.crcCheckChanceSupplier = crcCheckChanceSupplier;
- new Thread(new Reader(source, info, dataBuffer)).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(new Reader(source, info, dataBuffer))).start();
}
public int read() throws IOException
[08/10] cassandra git commit: Merge branch 'cassandra-3.11' into
cassandra-3.X
Posted by sn...@apache.org.
Merge branch 'cassandra-3.11' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e3038632
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e3038632
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e3038632
Branch: refs/heads/cassandra-3.X
Commit: e3038632580314dfbad057d1d709381636798c84
Parents: 8d1c33f 50a9b1a
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:39:55 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:39:55 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 22 +++++++++-
.../AbstractCommitLogSegmentManager.java | 3 +-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../cassandra/index/sasi/TermIterator.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 42 ++++++++++----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 12 +++---
.../cassandra/service/StorageService.java | 7 ++--
9 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3038632/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3038632/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e3038632/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
[06/10] cassandra git commit: Merge branch cassandra-3.0 into
cassandra-3.11
Posted by sn...@apache.org.
Merge branch cassandra-3.0 into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/50a9b1ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/50a9b1ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/50a9b1ab
Branch: refs/heads/cassandra-3.11
Commit: 50a9b1abb1a46d264343058837f334d5a73b9bda
Parents: c80b9fb 7f668c6
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:39:37 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:39:41 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 22 +++++++++-
.../AbstractCommitLogSegmentManager.java | 3 +-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../cassandra/index/sasi/TermIterator.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 42 ++++++++++----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 12 +++---
.../cassandra/service/StorageService.java | 7 ++--
9 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d056492,5bc30be..7413086
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,114 -1,5 +1,115 @@@
-3.0.11
+3.10
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Thread local pools never cleaned up (CASSANDRA-13033)
* Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
* CQL often queries static columns unnecessarily (CASSANDRA-12768)
* Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 00ddf44,0000000..eff35f4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,550 -1,0 +1,551 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public abstract class AbstractCommitLogSegmentManager
+{
+ static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
+
+ /**
+ * Segment that is ready to be used. The management thread fills this and blocks until consumed.
+ *
+ * A single management thread produces this, and consumers are already synchronizing to make sure other work is
+ * performed atomically with consuming this. Volatile to make sure writes by the management thread become
+ * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
+ * synchronize on 'this'.
+ */
+ private volatile CommitLogSegment availableSegment = null;
+
+ private final WaitQueue segmentPrepared = new WaitQueue();
+
+ /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
+ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
+
+ /**
+ * The segment we are currently allocating commit log records to.
+ *
+ * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
+ */
+ private volatile CommitLogSegment allocatingFrom = null;
+
+ final String storageDirectory;
+
+ /**
+ * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
+ * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+ * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+ * on the manager thread, which will take a ms or two).
+ */
+ private final AtomicLong size = new AtomicLong();
+
+ private Thread managerThread;
+ protected final CommitLog commitLog;
+ private volatile boolean shutdown;
+
+ private static final SimpleCachedBufferPool bufferPool =
+ new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
+
+ AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
+ {
+ this.commitLog = commitLog;
+ this.storageDirectory = storageDirectory;
+ }
+
+ void start()
+ {
+ // The run loop for the manager thread
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (!shutdown)
+ {
+ try
+ {
+ assert availableSegment == null;
+ logger.debug("No segments in reserve; creating a fresh one");
+ availableSegment = createSegment();
+ if (shutdown)
+ {
+ // If shutdown() started and finished during segment creation, we are now left with a
+ // segment that no one will consume. Discard it.
+ discardAvailableSegment();
+ return;
+ }
+
+ segmentPrepared.signalAll();
+ Thread.yield();
+
+ if (availableSegment == null && !atSegmentBufferLimit())
+ // Writing threads need another segment now.
+ continue;
+
+ // Writing threads are not waiting for new segments, we can spend time on other tasks.
+ // flush old Cfs if we're full
+ maybeFlushToReclaim();
+
+ LockSupport.park();
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ // If we offered a segment, wait for it to be taken before reentering the loop.
+ // There could be a new segment in next not offered, but only on failure to discard it while
+ // shutting down-- nothing more can or needs to be done in that case.
+ }
+
+ while (availableSegment != null || atSegmentBufferLimit() && !shutdown)
+ LockSupport.park();
+ }
+ }
+ };
+
+ shutdown = false;
- managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
++ managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
+ managerThread.start();
+
+ // for simplicity, ensure the first segment is allocated before continuing
+ advanceAllocatingFrom(null);
+ }
+
+ private boolean atSegmentBufferLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+ }
+
+ private void maybeFlushToReclaim()
+ {
+ long unused = unusedCapacity();
+ if (unused < 0)
+ {
+ long flushingSize = 0;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ flushingSize += segment.onDiskSize();
+ segmentsToRecycle.add(segment);
+ if (flushingSize + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
+ }
+ }
+
+
+ /**
+ * Allocate a segment within this CLSM. Should either succeed or throw.
+ */
+ public abstract Allocation allocate(Mutation mutation, int size);
+
+ /**
+ * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
+ * decide what to do with those segments on disk after they've been replayed.
+ */
+ abstract void handleReplayedSegment(final File file);
+
+ /**
+ * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
+ * to segment manager so it's performed on segment management thread.
+ */
+ abstract CommitLogSegment createSegment();
+
+ /**
+ * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
+ * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
+ * during testing resets.
+ *
+ * @param segment segment to be discarded
+ * @param delete whether or not the segment is safe to be deleted.
+ */
+ abstract void discard(CommitLogSegment segment, boolean delete);
+
+ /**
+ * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
+ *
+ * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
+ */
+ @DontInline
+ void advanceAllocatingFrom(CommitLogSegment old)
+ {
+ while (true)
+ {
+ synchronized (this)
+ {
+ // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
+ if (allocatingFrom != old)
+ return;
+
+ // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
+ if (availableSegment != null)
+ {
+ // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
+ // the critical section.
+ activeSegments.add(allocatingFrom = availableSegment);
+ availableSegment = null;
+ break;
+ }
+ }
+
+ awaitAvailableSegment(old);
+ }
+
+ // Signal the management thread to prepare a new segment.
+ wakeManager();
+
+ if (old != null)
+ {
+ // Now we can run the user defined command just after switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ commitLog.archiver.maybeArchive(old);
+
+ // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+ old.discardUnusedTail();
+ }
+
+ // request that the CL be synced out-of-band, as we've finished a segment
+ commitLog.requestExtraSync();
+ }
+
+ void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
+ {
+ do
+ {
+ WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+ if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
+ prepared.awaitUninterruptibly();
+ else
+ prepared.cancel();
+ }
+ while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
+ }
+
+ /**
+ * Switch to a new segment, regardless of how much is left in the current one.
+ *
+ * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
+ */
+ void forceRecycleAll(Iterable<UUID> droppedCfs)
+ {
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
+ CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
+ advanceAllocatingFrom(last);
+
+ // wait for the commit log modifications
+ last.waitForModifications();
+
+ // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
+ // to complete
+ Keyspace.writeOrder.awaitNewBarrier();
+
+ // flush and wait for all CFs that are dirty in segments up-to and including 'last'
+ Future<?> future = flushDataFrom(segmentsToRecycle, true);
+ try
+ {
+ future.get();
+
+ for (CommitLogSegment segment : activeSegments)
+ for (UUID cfId : droppedCfs)
+ segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+
+ // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
+ // if the previous active segment was the only one to recycle (since an active segment isn't
+ // necessarily dirty, and we only call dCS after a flush).
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment.isUnused())
+ archiveAndDiscard(segment);
+ }
+
+ CommitLogSegment first;
+ if ((first = activeSegments.peek()) != null && first.id <= last.id)
+ logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
+ }
+ catch (Throwable t)
+ {
+ // for now just log the error
+ logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
+ }
+ }
+
+ /**
+ * Indicates that a segment is no longer in use and that it should be discarded.
+ *
+ * @param segment segment that is no longer in use
+ */
+ void archiveAndDiscard(final CommitLogSegment segment)
+ {
+ boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
+ if (!activeSegments.remove(segment))
+ return; // already discarded
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
+ discard(segment, archiveSuccess);
+ }
+
+ /**
+ * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
+ * @param addedSize
+ */
+ void addSize(long addedSize)
+ {
+ size.addAndGet(addedSize);
+ }
+
+ /**
+ * @return the space (in bytes) used by all segment files.
+ */
+ public long onDiskSize()
+ {
+ return size.get();
+ }
+
+ private long unusedCapacity()
+ {
+ long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+ long currentSize = size.get();
+ logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
+ return total - currentSize;
+ }
+
+ /**
+ * Force a flush on all CFs that are still dirty in @param segments.
+ *
+ * @return a Future that will finish when all the flushes are complete.
+ */
+ private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
+ {
+ if (segments.isEmpty())
+ return Futures.immediateFuture(null);
+ final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
+
+ // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
+ final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
+
+ for (CommitLogSegment segment : segments)
+ {
+ for (UUID dirtyCFId : segment.getDirtyCFIDs())
+ {
+ Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+ if (pair == null)
+ {
+ // even though we remove the schema entry before a final flush when dropping a CF,
+ // it's still possible for a writer to race and finish his append after the flush.
+ logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
+ segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+ }
+ else if (!flushes.containsKey(dirtyCFId))
+ {
+ String keyspace = pair.left;
+ final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+ // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
+ // no deadlock possibility since switchLock removal
+ flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
+ }
+ }
+ }
+
+ return Futures.allAsList(flushes.values());
+ }
+
+ /**
+ * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+ * Only call this after the AbstractCommitLogService is shut down.
+ */
+ public void stopUnsafe(boolean deleteSegments)
+ {
+ logger.debug("CLSM closing and clearing existing commit log segments...");
+
+ shutdown();
+ try
+ {
+ awaitTermination();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ for (CommitLogSegment segment : activeSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ activeSegments.clear();
+
+ size.set(0L);
+
+ logger.trace("CLSM done with closing and clearing existing commit log segments.");
+ }
+
+ /**
+ * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
+ */
+ void awaitManagementTasksCompletion()
+ {
+ if (availableSegment == null && !atSegmentBufferLimit())
+ {
+ awaitAvailableSegment(allocatingFrom);
+ }
+ }
+
+ /**
+ * Explicitly for use only during resets in unit testing.
+ */
+ private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
+ {
+ try
+ {
+ discard(segment, delete);
+ }
+ catch (AssertionError ignored)
+ {
+ // segment file does not exist
+ }
+ }
+
+ /**
+ * Initiates the shutdown process for the management thread.
+ */
+ public void shutdown()
+ {
+ assert !shutdown;
+ shutdown = true;
+
+ // Release the management thread and delete prepared segment.
+ // Do not block as another thread may claim the segment (this can happen during unit test initialization).
+ discardAvailableSegment();
+ wakeManager();
+ }
+
+ private void discardAvailableSegment()
+ {
+ CommitLogSegment next = null;
+ synchronized (this)
+ {
+ next = availableSegment;
+ availableSegment = null;
+ }
+ if (next != null)
+ next.discard(true);
+ }
+
+ /**
+ * Returns when the management thread terminates.
+ */
+ public void awaitTermination() throws InterruptedException
+ {
+ managerThread.join();
+ managerThread = null;
+
+ for (CommitLogSegment segment : activeSegments)
+ segment.close();
+
+ bufferPool.shutdown();
+ }
+
+ /**
+ * @return a read-only collection of the active commit log segments
+ */
+ @VisibleForTesting
+ public Collection<CommitLogSegment> getActiveSegments()
+ {
+ return Collections.unmodifiableCollection(activeSegments);
+ }
+
+ /**
+ * @return the current CommitLogPosition of the active segment we're allocating from
+ */
+ CommitLogPosition getCurrentPosition()
+ {
+ return allocatingFrom.getCurrentCommitLogPosition();
+ }
+
+ /**
+ * Forces a disk flush on the commit log files that need it. Blocking.
+ */
+ public void sync() throws IOException
+ {
+ CommitLogSegment current = allocatingFrom;
+ for (CommitLogSegment segment : getActiveSegments())
+ {
+ // Do not sync segments that became active after sync started.
+ if (segment.id > current.id)
+ return;
+ segment.sync();
+ }
+ }
+
+ /**
+ * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
+ */
+ SimpleCachedBufferPool getBufferPool()
+ {
+ return bufferPool;
+ }
+
+ void wakeManager()
+ {
+ LockSupport.unpark(managerThread);
+ }
+
+ /**
+ * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
+ * a buffer to become available.
+ */
+ void notifyBufferFreed()
+ {
+ wakeManager();
+ }
+
+ /** Read-only access to current segment for subclasses. */
+ CommitLogSegment allocatingFrom()
+ {
+ return allocatingFrom;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7b56da3,e5a5887..834aa0d
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,18 -17,16 +17,19 @@@
*/
package org.apache.cassandra.db.commitlog;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-import org.slf4j.*;
-
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Timer.Context;
+
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
public abstract class AbstractCommitLogService
{
@@@ -148,8 -160,7 +149,8 @@@
}
};
+ shutdown = false;
- thread = new Thread(runnable, name);
+ thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/index/sasi/TermIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/sasi/TermIterator.java
index 5b08a56,0000000..1ddfcb9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
@@@ -1,218 -1,0 +1,219 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.io.util.FileUtils;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TermIterator extends RangeIterator<Long, Token>
+{
+ private static final Logger logger = LoggerFactory.getLogger(TermIterator.class);
+
+ private static final ThreadLocal<ExecutorService> SEARCH_EXECUTOR = new ThreadLocal<ExecutorService>()
+ {
+ public ExecutorService initialValue()
+ {
+ final String currentThread = Thread.currentThread().getName();
+ final int concurrencyFactor = DatabaseDescriptor.searchConcurrencyFactor();
+
+ logger.info("Search Concurrency Factor is set to {} for {}", concurrencyFactor, currentThread);
+
+ return (concurrencyFactor <= 1)
+ ? MoreExecutors.newDirectExecutorService()
+ : Executors.newFixedThreadPool(concurrencyFactor, new ThreadFactory()
+ {
+ public final AtomicInteger count = new AtomicInteger();
+
+ public Thread newThread(Runnable task)
+ {
- return new Thread(task, currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
++ return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
+ }
+ });
+ }
+ };
+
+ private final Expression expression;
+
+ private final RangeIterator<Long, Token> union;
+ private final Set<SSTableIndex> referencedIndexes;
+
+ private TermIterator(Expression e,
+ RangeIterator<Long, Token> union,
+ Set<SSTableIndex> referencedIndexes)
+ {
+ super(union.getMinimum(), union.getMaximum(), union.getCount());
+
+ this.expression = e;
+ this.union = union;
+ this.referencedIndexes = referencedIndexes;
+ }
+
+ @SuppressWarnings("resource")
+ public static TermIterator build(final Expression e, Set<SSTableIndex> perSSTableIndexes)
+ {
+ final List<RangeIterator<Long, Token>> tokens = new CopyOnWriteArrayList<>();
+ final AtomicLong tokenCount = new AtomicLong(0);
+
+ RangeIterator<Long, Token> memtableIterator = e.index.searchMemtable(e);
+ if (memtableIterator != null)
+ {
+ tokens.add(memtableIterator);
+ tokenCount.addAndGet(memtableIterator.getCount());
+ }
+
+ final Set<SSTableIndex> referencedIndexes = new CopyOnWriteArraySet<>();
+
+ try
+ {
+ final CountDownLatch latch = new CountDownLatch(perSSTableIndexes.size());
+ final ExecutorService searchExecutor = SEARCH_EXECUTOR.get();
+
+ for (final SSTableIndex index : perSSTableIndexes)
+ {
+ if (e.getOp() == Expression.Op.PREFIX &&
+ index.mode() == OnDiskIndexBuilder.Mode.CONTAINS && !index.hasMarkedPartials())
+ throw new UnsupportedOperationException(String.format("The index %s has not yet been upgraded " +
+ "to support prefix queries in CONTAINS mode. " +
+ "Wait for compaction or rebuild the index.",
+ index.getPath()));
+
+
+ if (!index.reference())
+ {
+ latch.countDown();
+ continue;
+ }
+
+ // add to referenced right after the reference was acquired,
+ // that helps to release index if something goes bad inside of the search
+ referencedIndexes.add(index);
+
+ searchExecutor.submit((Runnable) () -> {
+ try
+ {
+ e.checkpoint();
+
+ RangeIterator<Long, Token> keyIterator = index.search(e);
+ if (keyIterator == null)
+ {
+ releaseIndex(referencedIndexes, index);
+ return;
+ }
+
+ tokens.add(keyIterator);
+ tokenCount.getAndAdd(keyIterator.getCount());
+ }
+ catch (Throwable e1)
+ {
+ releaseIndex(referencedIndexes, index);
+
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Failed search an index %s, skipping.", index.getPath()), e1);
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ });
+ }
+
+ Uninterruptibles.awaitUninterruptibly(latch);
+
+ // checkpoint right away after all indexes complete search because we might have crossed the quota
+ e.checkpoint();
+
+ RangeIterator<Long, Token> ranges = RangeUnionIterator.build(tokens);
+ return ranges == null ? null : new TermIterator(e, ranges, referencedIndexes);
+ }
+ catch (Throwable ex)
+ {
+ // if execution quota was exceeded while opening indexes or something else happened
+ // local (yet to be tracked) indexes should be released first before re-throwing exception
+ referencedIndexes.forEach(TermIterator::releaseQuietly);
+
+ throw ex;
+ }
+ }
+
+ protected Token computeNext()
+ {
+ try
+ {
+ return union.hasNext() ? union.next() : endOfData();
+ }
+ finally
+ {
+ expression.checkpoint();
+ }
+ }
+
+ protected void performSkipTo(Long nextToken)
+ {
+ try
+ {
+ union.skipTo(nextToken);
+ }
+ finally
+ {
+ expression.checkpoint();
+ }
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(union);
+ referencedIndexes.forEach(TermIterator::releaseQuietly);
+ referencedIndexes.clear();
+ }
+
+ private static void releaseIndex(Set<SSTableIndex> indexes, SSTableIndex index)
+ {
+ indexes.remove(index);
+ releaseQuietly(index);
+ }
+
+ private static void releaseQuietly(SSTableIndex index)
+ {
+ try
+ {
+ index.release();
+ }
+ catch (Throwable e)
+ {
+ logger.error(String.format("Failed to release index %s", index.getPath()), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1f47334,a9dfcdc..1843e7b
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -502,31 -506,27 +503,28 @@@ public class OutboundTcpConnection exte
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
- new Thread("HANDSHAKE-" + poolReference.endPoint())
- new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
++ Runnable r = () ->
{
- @Override
- public void run()
+ try
{
- try
- {
- logger.info("Handshaking version with {}", poolReference.endPoint());
- version.set(inputStream.readInt());
- }
- catch (IOException ex)
- {
- final String msg = "Cannot handshake version with " + poolReference.endPoint();
- if (logger.isTraceEnabled())
- logger.trace(msg, ex);
- else
- logger.info(msg);
- }
- finally
- {
- //unblock the waiting thread on either success or fail
- versionLatch.countDown();
- }
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
}
- }.start();
- }),"HANDSHAKE-" + poolReference.endPoint()).start();
++ };
++ new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe,61dfa50..904deb3
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@@ -59,17 -60,17 +60,14 @@@ public class RoundRobinScheduler implem
taskCount = new Semaphore(options.throttle_limit - 1);
queues = new NonBlockingHashMap<String, WeightedQueue>();
-- Runnable runnable = new Runnable()
++ Runnable runnable = () ->
{
-- public void run()
++ while (true)
{
-- while (true)
-- {
-- schedule();
-- }
++ schedule();
}
};
- Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+ Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
scheduler.start();
logger.info("Started the RoundRobin Request Scheduler");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 3f999c2,71cbc35..1247e03
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -631,8 -592,29 +632,8 @@@ public class StorageService extends Not
throw new AssertionError(e);
}
- if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
- {
- logger.info("Loading persisted ring state");
- Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
- Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
- for (InetAddress ep : loadedTokens.keySet())
- {
- if (ep.equals(FBUtilities.getBroadcastAddress()))
- {
- // entry has been mistakenly added, delete it
- SystemKeyspace.removeEndpoint(ep);
- }
- else
- {
- if (loadedHostIds.containsKey(ep))
- tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
- Gossiper.instance.addSavedEndpoint(ep);
- }
- }
- }
-
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
- drainOnShutdown = new Thread(new WrappedRunnable()
+ drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@@ -647,10 -629,10 +648,10 @@@
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
logbackHook.run();
}
- }, "StorageServiceShutdownHook");
+ }), "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
- replacing = DatabaseDescriptor.isReplacing();
+ replacing = isReplacing();
if (!Boolean.parseBoolean(System.getProperty("cassandra.start_gossip", "true")))
{
[05/10] cassandra git commit: Merge branch cassandra-3.0 into
cassandra-3.11
Posted by sn...@apache.org.
Merge branch cassandra-3.0 into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/50a9b1ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/50a9b1ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/50a9b1ab
Branch: refs/heads/cassandra-3.X
Commit: 50a9b1abb1a46d264343058837f334d5a73b9bda
Parents: c80b9fb 7f668c6
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:39:37 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:39:41 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 22 +++++++++-
.../AbstractCommitLogSegmentManager.java | 3 +-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../cassandra/index/sasi/TermIterator.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 42 ++++++++++----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 12 +++---
.../cassandra/service/StorageService.java | 7 ++--
9 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d056492,5bc30be..7413086
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,114 -1,5 +1,115 @@@
-3.0.11
+3.10
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Thread local pools never cleaned up (CASSANDRA-13033)
* Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
* CQL often queries static columns unnecessarily (CASSANDRA-12768)
* Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 00ddf44,0000000..eff35f4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,550 -1,0 +1,551 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public abstract class AbstractCommitLogSegmentManager
+{
+ static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
+
+ /**
+ * Segment that is ready to be used. The management thread fills this and blocks until consumed.
+ *
+ * A single management thread produces this, and consumers are already synchronizing to make sure other work is
+ * performed atomically with consuming this. Volatile to make sure writes by the management thread become
+ * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
+ * synchronize on 'this'.
+ */
+ private volatile CommitLogSegment availableSegment = null;
+
+ private final WaitQueue segmentPrepared = new WaitQueue();
+
+ /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
+ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
+
+ /**
+ * The segment we are currently allocating commit log records to.
+ *
+ * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
+ */
+ private volatile CommitLogSegment allocatingFrom = null;
+
+ final String storageDirectory;
+
+ /**
+ * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
+ * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+ * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+ * on the manager thread, which will take a ms or two).
+ */
+ private final AtomicLong size = new AtomicLong();
+
+ private Thread managerThread;
+ protected final CommitLog commitLog;
+ private volatile boolean shutdown;
+
+ private static final SimpleCachedBufferPool bufferPool =
+ new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
+
+ AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
+ {
+ this.commitLog = commitLog;
+ this.storageDirectory = storageDirectory;
+ }
+
+ void start()
+ {
+ // The run loop for the manager thread
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (!shutdown)
+ {
+ try
+ {
+ assert availableSegment == null;
+ logger.debug("No segments in reserve; creating a fresh one");
+ availableSegment = createSegment();
+ if (shutdown)
+ {
+ // If shutdown() started and finished during segment creation, we are now left with a
+ // segment that no one will consume. Discard it.
+ discardAvailableSegment();
+ return;
+ }
+
+ segmentPrepared.signalAll();
+ Thread.yield();
+
+ if (availableSegment == null && !atSegmentBufferLimit())
+ // Writing threads need another segment now.
+ continue;
+
+ // Writing threads are not waiting for new segments, we can spend time on other tasks.
+ // flush old Cfs if we're full
+ maybeFlushToReclaim();
+
+ LockSupport.park();
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ // If we offered a segment, wait for it to be taken before reentering the loop.
+ // There could be a new segment in next not offered, but only on failure to discard it while
+ // shutting down-- nothing more can or needs to be done in that case.
+ }
+
+ while (availableSegment != null || atSegmentBufferLimit() && !shutdown)
+ LockSupport.park();
+ }
+ }
+ };
+
+ shutdown = false;
- managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
++ managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
+ managerThread.start();
+
+ // for simplicity, ensure the first segment is allocated before continuing
+ advanceAllocatingFrom(null);
+ }
+
+ private boolean atSegmentBufferLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+ }
+
+ private void maybeFlushToReclaim()
+ {
+ long unused = unusedCapacity();
+ if (unused < 0)
+ {
+ long flushingSize = 0;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ flushingSize += segment.onDiskSize();
+ segmentsToRecycle.add(segment);
+ if (flushingSize + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
+ }
+ }
+
+
+ /**
+ * Allocate a segment within this CLSM. Should either succeed or throw.
+ */
+ public abstract Allocation allocate(Mutation mutation, int size);
+
+ /**
+ * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
+ * decide what to do with those segments on disk after they've been replayed.
+ */
+ abstract void handleReplayedSegment(final File file);
+
+ /**
+ * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
+ * to segment manager so it's performed on segment management thread.
+ */
+ abstract CommitLogSegment createSegment();
+
+ /**
+ * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
+ * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
+ * during testing resets.
+ *
+ * @param segment segment to be discarded
+ * @param delete whether or not the segment is safe to be deleted.
+ */
+ abstract void discard(CommitLogSegment segment, boolean delete);
+
+ /**
+ * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
+ *
+ * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
+ */
+ @DontInline
+ void advanceAllocatingFrom(CommitLogSegment old)
+ {
+ while (true)
+ {
+ synchronized (this)
+ {
+ // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
+ if (allocatingFrom != old)
+ return;
+
+ // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
+ if (availableSegment != null)
+ {
+ // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
+ // the critical section.
+ activeSegments.add(allocatingFrom = availableSegment);
+ availableSegment = null;
+ break;
+ }
+ }
+
+ awaitAvailableSegment(old);
+ }
+
+ // Signal the management thread to prepare a new segment.
+ wakeManager();
+
+ if (old != null)
+ {
+ // Now we can run the user defined command just after switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ commitLog.archiver.maybeArchive(old);
+
+ // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+ old.discardUnusedTail();
+ }
+
+ // request that the CL be synced out-of-band, as we've finished a segment
+ commitLog.requestExtraSync();
+ }
+
+ void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
+ {
+ do
+ {
+ WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+ if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
+ prepared.awaitUninterruptibly();
+ else
+ prepared.cancel();
+ }
+ while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
+ }
+
+ /**
+ * Switch to a new segment, regardless of how much is left in the current one.
+ *
+ * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
+ */
+ void forceRecycleAll(Iterable<UUID> droppedCfs)
+ {
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
+ CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
+ advanceAllocatingFrom(last);
+
+ // wait for the commit log modifications
+ last.waitForModifications();
+
+ // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
+ // to complete
+ Keyspace.writeOrder.awaitNewBarrier();
+
+ // flush and wait for all CFs that are dirty in segments up-to and including 'last'
+ Future<?> future = flushDataFrom(segmentsToRecycle, true);
+ try
+ {
+ future.get();
+
+ for (CommitLogSegment segment : activeSegments)
+ for (UUID cfId : droppedCfs)
+ segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+
+ // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
+ // if the previous active segment was the only one to recycle (since an active segment isn't
+ // necessarily dirty, and we only call dCS after a flush).
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment.isUnused())
+ archiveAndDiscard(segment);
+ }
+
+ CommitLogSegment first;
+ if ((first = activeSegments.peek()) != null && first.id <= last.id)
+ logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
+ }
+ catch (Throwable t)
+ {
+ // for now just log the error
+ logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
+ }
+ }
+
+ /**
+ * Indicates that a segment is no longer in use and that it should be discarded.
+ *
+ * @param segment segment that is no longer in use
+ */
+ void archiveAndDiscard(final CommitLogSegment segment)
+ {
+ boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
+ if (!activeSegments.remove(segment))
+ return; // already discarded
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
+ discard(segment, archiveSuccess);
+ }
+
+ /**
+ * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
+ * @param addedSize
+ */
+ void addSize(long addedSize)
+ {
+ size.addAndGet(addedSize);
+ }
+
+ /**
+ * @return the space (in bytes) used by all segment files.
+ */
+ public long onDiskSize()
+ {
+ return size.get();
+ }
+
+ private long unusedCapacity()
+ {
+ long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+ long currentSize = size.get();
+ logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
+ return total - currentSize;
+ }
+
+ /**
+ * Force a flush on all CFs that are still dirty in @param segments.
+ *
+ * @return a Future that will finish when all the flushes are complete.
+ */
+ private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
+ {
+ if (segments.isEmpty())
+ return Futures.immediateFuture(null);
+ final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
+
+ // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
+ final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
+
+ for (CommitLogSegment segment : segments)
+ {
+ for (UUID dirtyCFId : segment.getDirtyCFIDs())
+ {
+ Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+ if (pair == null)
+ {
+ // even though we remove the schema entry before a final flush when dropping a CF,
+ // it's still possible for a writer to race and finish his append after the flush.
+ logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
+ segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+ }
+ else if (!flushes.containsKey(dirtyCFId))
+ {
+ String keyspace = pair.left;
+ final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+ // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
+ // no deadlock possibility since switchLock removal
+ flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
+ }
+ }
+ }
+
+ return Futures.allAsList(flushes.values());
+ }
+
+ /**
+ * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+ * Only call this after the AbstractCommitLogService is shut down.
+ */
+ public void stopUnsafe(boolean deleteSegments)
+ {
+ logger.debug("CLSM closing and clearing existing commit log segments...");
+
+ shutdown();
+ try
+ {
+ awaitTermination();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ for (CommitLogSegment segment : activeSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ activeSegments.clear();
+
+ size.set(0L);
+
+ logger.trace("CLSM done with closing and clearing existing commit log segments.");
+ }
+
+ /**
+ * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
+ */
+ void awaitManagementTasksCompletion()
+ {
+ if (availableSegment == null && !atSegmentBufferLimit())
+ {
+ awaitAvailableSegment(allocatingFrom);
+ }
+ }
+
+ /**
+ * Explicitly for use only during resets in unit testing.
+ */
+ private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
+ {
+ try
+ {
+ discard(segment, delete);
+ }
+ catch (AssertionError ignored)
+ {
+ // segment file does not exist
+ }
+ }
+
+ /**
+ * Initiates the shutdown process for the management thread.
+ */
+ public void shutdown()
+ {
+ assert !shutdown;
+ shutdown = true;
+
+ // Release the management thread and delete prepared segment.
+ // Do not block as another thread may claim the segment (this can happen during unit test initialization).
+ discardAvailableSegment();
+ wakeManager();
+ }
+
+ private void discardAvailableSegment()
+ {
+ CommitLogSegment next = null;
+ synchronized (this)
+ {
+ next = availableSegment;
+ availableSegment = null;
+ }
+ if (next != null)
+ next.discard(true);
+ }
+
+ /**
+ * Returns when the management thread terminates.
+ */
+ public void awaitTermination() throws InterruptedException
+ {
+ managerThread.join();
+ managerThread = null;
+
+ for (CommitLogSegment segment : activeSegments)
+ segment.close();
+
+ bufferPool.shutdown();
+ }
+
+ /**
+ * @return a read-only collection of the active commit log segments
+ */
+ @VisibleForTesting
+ public Collection<CommitLogSegment> getActiveSegments()
+ {
+ return Collections.unmodifiableCollection(activeSegments);
+ }
+
+ /**
+ * @return the current CommitLogPosition of the active segment we're allocating from
+ */
+ CommitLogPosition getCurrentPosition()
+ {
+ return allocatingFrom.getCurrentCommitLogPosition();
+ }
+
+ /**
+ * Forces a disk flush on the commit log files that need it. Blocking.
+ */
+ public void sync() throws IOException
+ {
+ CommitLogSegment current = allocatingFrom;
+ for (CommitLogSegment segment : getActiveSegments())
+ {
+ // Do not sync segments that became active after sync started.
+ if (segment.id > current.id)
+ return;
+ segment.sync();
+ }
+ }
+
+ /**
+ * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
+ */
+ SimpleCachedBufferPool getBufferPool()
+ {
+ return bufferPool;
+ }
+
+ void wakeManager()
+ {
+ LockSupport.unpark(managerThread);
+ }
+
+ /**
+ * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
+ * a buffer to become available.
+ */
+ void notifyBufferFreed()
+ {
+ wakeManager();
+ }
+
+ /** Read-only access to current segment for subclasses. */
+ CommitLogSegment allocatingFrom()
+ {
+ return allocatingFrom;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7b56da3,e5a5887..834aa0d
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,18 -17,16 +17,19 @@@
*/
package org.apache.cassandra.db.commitlog;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-import org.slf4j.*;
-
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Timer.Context;
+
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
public abstract class AbstractCommitLogService
{
@@@ -148,8 -160,7 +149,8 @@@
}
};
+ shutdown = false;
- thread = new Thread(runnable, name);
+ thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/index/sasi/TermIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/sasi/TermIterator.java
index 5b08a56,0000000..1ddfcb9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
@@@ -1,218 -1,0 +1,219 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.io.util.FileUtils;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TermIterator extends RangeIterator<Long, Token>
+{
+ private static final Logger logger = LoggerFactory.getLogger(TermIterator.class);
+
+ private static final ThreadLocal<ExecutorService> SEARCH_EXECUTOR = new ThreadLocal<ExecutorService>()
+ {
+ public ExecutorService initialValue()
+ {
+ final String currentThread = Thread.currentThread().getName();
+ final int concurrencyFactor = DatabaseDescriptor.searchConcurrencyFactor();
+
+ logger.info("Search Concurrency Factor is set to {} for {}", concurrencyFactor, currentThread);
+
+ return (concurrencyFactor <= 1)
+ ? MoreExecutors.newDirectExecutorService()
+ : Executors.newFixedThreadPool(concurrencyFactor, new ThreadFactory()
+ {
+ public final AtomicInteger count = new AtomicInteger();
+
+ public Thread newThread(Runnable task)
+ {
- return new Thread(task, currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
++ return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
+ }
+ });
+ }
+ };
+
+ private final Expression expression;
+
+ private final RangeIterator<Long, Token> union;
+ private final Set<SSTableIndex> referencedIndexes;
+
+ private TermIterator(Expression e,
+ RangeIterator<Long, Token> union,
+ Set<SSTableIndex> referencedIndexes)
+ {
+ super(union.getMinimum(), union.getMaximum(), union.getCount());
+
+ this.expression = e;
+ this.union = union;
+ this.referencedIndexes = referencedIndexes;
+ }
+
+ @SuppressWarnings("resource")
+ public static TermIterator build(final Expression e, Set<SSTableIndex> perSSTableIndexes)
+ {
+ final List<RangeIterator<Long, Token>> tokens = new CopyOnWriteArrayList<>();
+ final AtomicLong tokenCount = new AtomicLong(0);
+
+ RangeIterator<Long, Token> memtableIterator = e.index.searchMemtable(e);
+ if (memtableIterator != null)
+ {
+ tokens.add(memtableIterator);
+ tokenCount.addAndGet(memtableIterator.getCount());
+ }
+
+ final Set<SSTableIndex> referencedIndexes = new CopyOnWriteArraySet<>();
+
+ try
+ {
+ final CountDownLatch latch = new CountDownLatch(perSSTableIndexes.size());
+ final ExecutorService searchExecutor = SEARCH_EXECUTOR.get();
+
+ for (final SSTableIndex index : perSSTableIndexes)
+ {
+ if (e.getOp() == Expression.Op.PREFIX &&
+ index.mode() == OnDiskIndexBuilder.Mode.CONTAINS && !index.hasMarkedPartials())
+ throw new UnsupportedOperationException(String.format("The index %s has not yet been upgraded " +
+ "to support prefix queries in CONTAINS mode. " +
+ "Wait for compaction or rebuild the index.",
+ index.getPath()));
+
+
+ if (!index.reference())
+ {
+ latch.countDown();
+ continue;
+ }
+
+ // add to referenced right after the reference was acquired,
+ // that helps to release index if something goes bad inside of the search
+ referencedIndexes.add(index);
+
+ searchExecutor.submit((Runnable) () -> {
+ try
+ {
+ e.checkpoint();
+
+ RangeIterator<Long, Token> keyIterator = index.search(e);
+ if (keyIterator == null)
+ {
+ releaseIndex(referencedIndexes, index);
+ return;
+ }
+
+ tokens.add(keyIterator);
+ tokenCount.getAndAdd(keyIterator.getCount());
+ }
+ catch (Throwable e1)
+ {
+ releaseIndex(referencedIndexes, index);
+
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Failed search an index %s, skipping.", index.getPath()), e1);
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ });
+ }
+
+ Uninterruptibles.awaitUninterruptibly(latch);
+
+ // checkpoint right away after all indexes complete search because we might have crossed the quota
+ e.checkpoint();
+
+ RangeIterator<Long, Token> ranges = RangeUnionIterator.build(tokens);
+ return ranges == null ? null : new TermIterator(e, ranges, referencedIndexes);
+ }
+ catch (Throwable ex)
+ {
+ // if execution quota was exceeded while opening indexes or something else happened
+ // local (yet to be tracked) indexes should be released first before re-throwing exception
+ referencedIndexes.forEach(TermIterator::releaseQuietly);
+
+ throw ex;
+ }
+ }
+
+ protected Token computeNext()
+ {
+ try
+ {
+ return union.hasNext() ? union.next() : endOfData();
+ }
+ finally
+ {
+ expression.checkpoint();
+ }
+ }
+
+ protected void performSkipTo(Long nextToken)
+ {
+ try
+ {
+ union.skipTo(nextToken);
+ }
+ finally
+ {
+ expression.checkpoint();
+ }
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(union);
+ referencedIndexes.forEach(TermIterator::releaseQuietly);
+ referencedIndexes.clear();
+ }
+
+ private static void releaseIndex(Set<SSTableIndex> indexes, SSTableIndex index)
+ {
+ indexes.remove(index);
+ releaseQuietly(index);
+ }
+
+ private static void releaseQuietly(SSTableIndex index)
+ {
+ try
+ {
+ index.release();
+ }
+ catch (Throwable e)
+ {
+ logger.error(String.format("Failed to release index %s", index.getPath()), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1f47334,a9dfcdc..1843e7b
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -502,31 -506,27 +503,28 @@@ public class OutboundTcpConnection exte
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
- new Thread("HANDSHAKE-" + poolReference.endPoint())
- new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
++ Runnable r = () ->
{
- @Override
- public void run()
+ try
{
- try
- {
- logger.info("Handshaking version with {}", poolReference.endPoint());
- version.set(inputStream.readInt());
- }
- catch (IOException ex)
- {
- final String msg = "Cannot handshake version with " + poolReference.endPoint();
- if (logger.isTraceEnabled())
- logger.trace(msg, ex);
- else
- logger.info(msg);
- }
- finally
- {
- //unblock the waiting thread on either success or fail
- versionLatch.countDown();
- }
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
}
- }.start();
- }),"HANDSHAKE-" + poolReference.endPoint()).start();
++ };
++ new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe,61dfa50..904deb3
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@@ -59,17 -60,17 +60,14 @@@ public class RoundRobinScheduler implem
taskCount = new Semaphore(options.throttle_limit - 1);
queues = new NonBlockingHashMap<String, WeightedQueue>();
-- Runnable runnable = new Runnable()
++ Runnable runnable = () ->
{
-- public void run()
++ while (true)
{
-- while (true)
-- {
-- schedule();
-- }
++ schedule();
}
};
- Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+ Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
scheduler.start();
logger.info("Started the RoundRobin Request Scheduler");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 3f999c2,71cbc35..1247e03
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -631,8 -592,29 +632,8 @@@ public class StorageService extends Not
throw new AssertionError(e);
}
- if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
- {
- logger.info("Loading persisted ring state");
- Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
- Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
- for (InetAddress ep : loadedTokens.keySet())
- {
- if (ep.equals(FBUtilities.getBroadcastAddress()))
- {
- // entry has been mistakenly added, delete it
- SystemKeyspace.removeEndpoint(ep);
- }
- else
- {
- if (loadedHostIds.containsKey(ep))
- tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
- Gossiper.instance.addSavedEndpoint(ep);
- }
- }
- }
-
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
- drainOnShutdown = new Thread(new WrappedRunnable()
+ drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@@ -647,10 -629,10 +648,10 @@@
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
logbackHook.run();
}
- }, "StorageServiceShutdownHook");
+ }), "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
- replacing = DatabaseDescriptor.isReplacing();
+ replacing = isReplacing();
if (!Boolean.parseBoolean(System.getProperty("cassandra.start_gossip", "true")))
{
[02/10] cassandra git commit: Thread local pools never cleaned up
Posted by sn...@apache.org.
Thread local pools never cleaned up
patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13033
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f668c6f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f668c6f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f668c6f
Branch: refs/heads/cassandra-3.11
Commit: 7f668c6fe117f892cd79863fb9805ea5d5a2823c
Parents: da94781
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:28:31 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:38:56 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 24 +++++++++++-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../db/commitlog/CommitLogSegmentManager.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 41 +++++++++-----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 3 +-
.../cassandra/service/StorageService.java | 7 ++--
.../cassandra/streaming/ConnectionHandler.java | 4 +-
.../compress/CompressedInputStream.java | 3 +-
10 files changed, 59 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbd47c1..5bc30be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Thread local pools never cleaned up (CASSANDRA-13033)
* Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
* CQL often queries static columns unnecessarily (CASSANDRA-12768)
* Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 33c80d5..22193c4 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+
/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
* is useful to give Java threads meaningful names which is useful when using
@@ -54,12 +57,29 @@ public class NamedThreadFactory implements ThreadFactory
public Thread newThread(Runnable runnable)
{
- String name = id + ":" + n.getAndIncrement();
- Thread thread = new Thread(threadGroup, runnable, name);
+ String name = id + ':' + n.getAndIncrement();
+ Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
thread.setPriority(priority);
thread.setDaemon(true);
if (contextClassLoader != null)
thread.setContextClassLoader(contextClassLoader);
return thread;
}
+
+ /**
+ * Ensures that {@link FastThreadLocal#remove() FastThreadLocal.remove()} is called when the {@link Runnable#run()}
+ * method of the given {@link Runnable} instance completes to ensure cleanup of {@link FastThreadLocal} instances.
+ * This is especially important for direct byte buffers allocated locally for a thread.
+ */
+ public static Runnable threadLocalDeallocator(Runnable r)
+ {
+ return () ->
+ {
+ try {
+ r.run();
+ } finally {
+ FastThreadLocal.removeAll();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 113d1ba..e5a5887 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.commitlog;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.*;
@@ -159,7 +160,7 @@ public abstract class AbstractCommitLogService
}
};
- thread = new Thread(runnable, name);
+ thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 82cee50..79dd316 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -174,7 +175,7 @@ public class CommitLogSegmentManager
run = true;
- managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+ managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
managerThread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index f573787..a9dfcdc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -45,6 +45,7 @@ import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -505,31 +506,27 @@ public class OutboundTcpConnection extends Thread
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
- new Thread("HANDSHAKE-" + poolReference.endPoint())
+ new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
{
- @Override
- public void run()
+ try
{
- try
- {
- logger.info("Handshaking version with {}", poolReference.endPoint());
- version.set(inputStream.readInt());
- }
- catch (IOException ex)
- {
- final String msg = "Cannot handshake version with " + poolReference.endPoint();
- if (logger.isTraceEnabled())
- logger.trace(msg, ex);
- else
- logger.info(msg);
- }
- finally
- {
- //unblock the waiting thread on either success or fail
- versionLatch.countDown();
- }
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
}
- }.start();
+ }),"HANDSHAKE-" + poolReference.endPoint()).start();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 7dd1b31..213e5c5 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -373,7 +373,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
private Thread createQueryThread(final int cmd, final UUID sessionId)
{
- return new Thread(new WrappedRunnable()
+ return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
// Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
// Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
@@ -440,6 +440,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
seen[si].clear();
}
}
- });
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe..61dfa50 100644
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.RequestSchedulerOptions;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -69,7 +70,7 @@ public class RoundRobinScheduler implements IRequestScheduler
}
}
};
- Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+ Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
scheduler.start();
logger.info("Started the RoundRobin Request Scheduler");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d70c8dc..71cbc35 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.auth.AuthMigrationListener;
import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -613,7 +614,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
- drainOnShutdown = new Thread(new WrappedRunnable()
+ drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@ -628,7 +629,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
logbackHook.run();
}
- }, "StorageServiceShutdownHook");
+ }), "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
replacing = DatabaseDescriptor.isReplacing();
@@ -3195,7 +3196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return 0;
int cmd = nextRepairCommand.incrementAndGet();
- new Thread(createRepairTask(cmd, keyspace, options, legacy)).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start();
return cmd;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..b83c089 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -214,7 +216,7 @@ public class ConnectionHandler
this.socket = socket;
this.protocolVersion = protocolVersion;
- new Thread(this, name() + "-" + session.peer).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
}
public ListenableFuture<?> close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index d59849f..6577980 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -33,6 +33,7 @@ import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -91,7 +92,7 @@ public class CompressedInputStream extends InputStream
this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
this.crcCheckChanceSupplier = crcCheckChanceSupplier;
- new Thread(new Reader(source, info, dataBuffer)).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(new Reader(source, info, dataBuffer))).start();
}
public int read() throws IOException
[07/10] cassandra git commit: Merge branch cassandra-3.0 into
cassandra-3.11
Posted by sn...@apache.org.
Merge branch cassandra-3.0 into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/50a9b1ab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/50a9b1ab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/50a9b1ab
Branch: refs/heads/trunk
Commit: 50a9b1abb1a46d264343058837f334d5a73b9bda
Parents: c80b9fb 7f668c6
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:39:37 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:39:41 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 22 +++++++++-
.../AbstractCommitLogSegmentManager.java | 3 +-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../cassandra/index/sasi/TermIterator.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 42 ++++++++++----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 12 +++---
.../cassandra/service/StorageService.java | 7 ++--
9 files changed, 58 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d056492,5bc30be..7413086
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,114 -1,5 +1,115 @@@
-3.0.11
+3.10
+ * Remove outboundBindAny configuration property (CASSANDRA-12673)
+ * Use correct bounds for all-data range when filtering (CASSANDRA-12666)
+ * Remove timing window in test case (CASSANDRA-12875)
+ * Resolve unit testing without JCE security libraries installed (CASSANDRA-12945)
+ * Fix inconsistencies in cassandra-stress load balancing policy (CASSANDRA-12919)
+ * Fix validation of non-frozen UDT cells (CASSANDRA-12916)
+ * Don't shut down socket input/output on StreamSession (CASSANDRA-12903)
+ * Fix Murmur3PartitionerTest (CASSANDRA-12858)
+ * Move cqlsh syntax rules into separate module and allow easier customization (CASSANDRA-12897)
+ * Fix CommitLogSegmentManagerTest (CASSANDRA-12283)
+ * Fix cassandra-stress truncate option (CASSANDRA-12695)
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
+ * Don't load MX4J beans twice (CASSANDRA-12869)
+ * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
+ * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)
+ * remove net.mintern.primitive library due to license issue (CASSANDRA-12845)
+ * Properly format IPv6 addresses when logging JMX service URL (CASSANDRA-12454)
+ * Optimize the vnode allocation for single replica per DC (CASSANDRA-12777)
+ * Use non-token restrictions for bounds when token restrictions are overridden (CASSANDRA-12419)
+ * Fix CQLSH auto completion for PER PARTITION LIMIT (CASSANDRA-12803)
+ * Use different build directories for Eclipse and Ant (CASSANDRA-12466)
+ * Avoid potential AttributeError in cqlsh due to no table metadata (CASSANDRA-12815)
+ * Fix RandomReplicationAwareTokenAllocatorTest.testExistingCluster (CASSANDRA-12812)
+ * Upgrade commons-codec to 1.9 (CASSANDRA-12790)
+ * Make the fanout size for LeveledCompactionStrategy to be configurable (CASSANDRA-11550)
+ * Add duration data type (CASSANDRA-11873)
+ * Fix timeout in ReplicationAwareTokenAllocatorTest (CASSANDRA-12784)
+ * Improve sum aggregate functions (CASSANDRA-12417)
+ * Make cassandra.yaml docs for batch_size_*_threshold_in_kb reflect changes in CASSANDRA-10876 (CASSANDRA-12761)
+ * cqlsh fails to format collections when using aliases (CASSANDRA-11534)
+ * Check for hash conflicts in prepared statements (CASSANDRA-12733)
+ * Exit query parsing upon first error (CASSANDRA-12598)
+ * Fix cassandra-stress to use single seed in UUID generation (CASSANDRA-12729)
+ * CQLSSTableWriter does not allow Update statement (CASSANDRA-12450)
+ * Config class uses boxed types but DD exposes primitive types (CASSANDRA-12199)
+ * Add pre- and post-shutdown hooks to Storage Service (CASSANDRA-12461)
+ * Add hint delivery metrics (CASSANDRA-12693)
+ * Remove IndexInfo cache from FileIndexInfoRetriever (CASSANDRA-12731)
+ * ColumnIndex does not reuse buffer (CASSANDRA-12502)
+ * cdc column addition still breaks schema migration tasks (CASSANDRA-12697)
+ * Upgrade metrics-reporter dependencies (CASSANDRA-12089)
+ * Tune compaction thread count via nodetool (CASSANDRA-12248)
+ * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232)
+ * Include repair session IDs in repair start message (CASSANDRA-12532)
+ * Add a blocking task to Index, run before joining the ring (CASSANDRA-12039)
+ * Fix NPE when using CQLSSTableWriter (CASSANDRA-12667)
+ * Support optional backpressure strategies at the coordinator (CASSANDRA-9318)
+ * Make randompartitioner work with new vnode allocation (CASSANDRA-12647)
+ * Fix cassandra-stress graphing (CASSANDRA-12237)
+ * Allow filtering on partition key columns for queries without secondary indexes (CASSANDRA-11031)
+ * Fix Cassandra Stress reporting thread model and precision (CASSANDRA-12585)
+ * Add JMH benchmarks.jar (CASSANDRA-12586)
+ * Cleanup uses of AlterTableStatementColumn (CASSANDRA-12567)
+ * Add keep-alive to streaming (CASSANDRA-11841)
+ * Tracing payload is passed through newSession(..) (CASSANDRA-11706)
+ * avoid deleting non existing sstable files and improve related log messages (CASSANDRA-12261)
+ * json/yaml output format for nodetool compactionhistory (CASSANDRA-12486)
+ * Retry all internode messages once after a connection is
+ closed and reopened (CASSANDRA-12192)
+ * Add support to rebuild from targeted replica (CASSANDRA-9875)
+ * Add sequence distribution type to cassandra stress (CASSANDRA-12490)
+ * "SELECT * FROM foo LIMIT ;" does not error out (CASSANDRA-12154)
+ * Define executeLocally() at the ReadQuery Level (CASSANDRA-12474)
+ * Extend read/write failure messages with a map of replica addresses
+ to error codes in the v5 native protocol (CASSANDRA-12311)
+ * Fix rebuild of SASI indexes with existing index files (CASSANDRA-12374)
+ * Let DatabaseDescriptor not implicitly startup services (CASSANDRA-9054, 12550)
+ * Fix clustering indexes in presence of static columns in SASI (CASSANDRA-12378)
+ * Fix queries on columns with reversed type on SASI indexes (CASSANDRA-12223)
+ * Added slow query log (CASSANDRA-12403)
+ * Count full coordinated request against timeout (CASSANDRA-12256)
+ * Allow TTL with null value on insert and update (CASSANDRA-12216)
+ * Make decommission operation resumable (CASSANDRA-12008)
+ * Add support to one-way targeted repair (CASSANDRA-9876)
+ * Remove clientutil jar (CASSANDRA-11635)
+ * Fix compaction throughput throttle (CASSANDRA-12366, CASSANDRA-12717)
+ * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
+ * Cassandra stress should dump all setting on startup (CASSANDRA-11914)
+ * Make it possible to compact a given token range (CASSANDRA-10643)
+ * Allow updating DynamicEndpointSnitch properties via JMX (CASSANDRA-12179)
+ * Collect metrics on queries by consistency level (CASSANDRA-7384)
+ * Add support for GROUP BY to SELECT statement (CASSANDRA-10707)
+ * Deprecate memtable_cleanup_threshold and update default for memtable_flush_writers (CASSANDRA-12228)
+ * Upgrade to OHC 0.4.4 (CASSANDRA-12133)
+ * Add version command to cassandra-stress (CASSANDRA-12258)
+ * Create compaction-stress tool (CASSANDRA-11844)
+ * Garbage-collecting compaction operation and schema option (CASSANDRA-7019)
+ * Add beta protocol flag for v5 native protocol (CASSANDRA-12142)
+ * Support filtering on non-PRIMARY KEY columns in the CREATE
+ MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368)
+ * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004)
+ * COPY FROM should raise error for non-existing input files (CASSANDRA-12174)
+ * Faster write path (CASSANDRA-12269)
+ * Option to leave omitted columns in INSERT JSON unset (CASSANDRA-11424)
+ * Support json/yaml output in nodetool tpstats (CASSANDRA-12035)
+ * Expose metrics for successful/failed authentication attempts (CASSANDRA-10635)
+ * Prepend snapshot name with "truncated" or "dropped" when a snapshot
+ is taken before truncating or dropping a table (CASSANDRA-12178)
+ * Optimize RestrictionSet (CASSANDRA-12153)
+ * cqlsh does not automatically downgrade CQL version (CASSANDRA-12150)
+ * Omit (de)serialization of state variable in UDAs (CASSANDRA-9613)
+ * Create a system table to expose prepared statements (CASSANDRA-8831)
+ * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970)
+ * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580)
+ * Add supplied username to authentication error messages (CASSANDRA-12076)
+ * Remove pre-startup check for open JMX port (CASSANDRA-12074)
+ * Remove compaction Severity from DynamicEndpointSnitch (CASSANDRA-11738)
+ * Restore resumable hints delivery (CASSANDRA-11960)
+ * Properly report LWT contention (CASSANDRA-12626)
+Merged from 3.0:
+ * Thread local pools never cleaned up (CASSANDRA-13033)
* Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
* CQL often queries static columns unnecessarily (CASSANDRA-12768)
* Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 00ddf44,0000000..eff35f4
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@@ -1,550 -1,0 +1,551 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.commitlog;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+
+/**
+ * Performs eager-creation of commit log segments in a background thread. All the
+ * public methods are thread safe.
+ */
+public abstract class AbstractCommitLogSegmentManager
+{
+ static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
+
+ /**
+ * Segment that is ready to be used. The management thread fills this and blocks until consumed.
+ *
+ * A single management thread produces this, and consumers are already synchronizing to make sure other work is
+ * performed atomically with consuming this. Volatile to make sure writes by the management thread become
+ * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must
+ * synchronize on 'this'.
+ */
+ private volatile CommitLogSegment availableSegment = null;
+
+ private final WaitQueue segmentPrepared = new WaitQueue();
+
+ /** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
+ private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
+
+ /**
+ * The segment we are currently allocating commit log records to.
+ *
+ * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
+ */
+ private volatile CommitLogSegment allocatingFrom = null;
+
+ final String storageDirectory;
+
+ /**
+ * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size
+ * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic
+ * can see the effect of recycling segments immediately (even though they're really happening asynchronously
+ * on the manager thread, which will take a ms or two).
+ */
+ private final AtomicLong size = new AtomicLong();
+
+ private Thread managerThread;
+ protected final CommitLog commitLog;
+ private volatile boolean shutdown;
+
+ private static final SimpleCachedBufferPool bufferPool =
+ new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize());
+
+ AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory)
+ {
+ this.commitLog = commitLog;
+ this.storageDirectory = storageDirectory;
+ }
+
+ void start()
+ {
+ // The run loop for the manager thread
+ Runnable runnable = new WrappedRunnable()
+ {
+ public void runMayThrow() throws Exception
+ {
+ while (!shutdown)
+ {
+ try
+ {
+ assert availableSegment == null;
+ logger.debug("No segments in reserve; creating a fresh one");
+ availableSegment = createSegment();
+ if (shutdown)
+ {
+ // If shutdown() started and finished during segment creation, we are now left with a
+ // segment that no one will consume. Discard it.
+ discardAvailableSegment();
+ return;
+ }
+
+ segmentPrepared.signalAll();
+ Thread.yield();
+
+ if (availableSegment == null && !atSegmentBufferLimit())
+ // Writing threads need another segment now.
+ continue;
+
+ // Writing threads are not waiting for new segments, we can spend time on other tasks.
+ // flush old Cfs if we're full
+ maybeFlushToReclaim();
+
+ LockSupport.park();
+ }
+ catch (Throwable t)
+ {
+ JVMStabilityInspector.inspectThrowable(t);
+ if (!CommitLog.handleCommitError("Failed managing commit log segments", t))
+ return;
+ // sleep some arbitrary period to avoid spamming CL
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ // If we offered a segment, wait for it to be taken before reentering the loop.
+ // There could be a new segment in next not offered, but only on failure to discard it while
+ // shutting down-- nothing more can or needs to be done in that case.
+ }
+
+ while (availableSegment != null || atSegmentBufferLimit() && !shutdown)
+ LockSupport.park();
+ }
+ }
+ };
+
+ shutdown = false;
- managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
++ managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
+ managerThread.start();
+
+ // for simplicity, ensure the first segment is allocated before continuing
+ advanceAllocatingFrom(null);
+ }
+
+ private boolean atSegmentBufferLimit()
+ {
+ return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit();
+ }
+
+ private void maybeFlushToReclaim()
+ {
+ long unused = unusedCapacity();
+ if (unused < 0)
+ {
+ long flushingSize = 0;
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>();
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment == allocatingFrom)
+ break;
+ flushingSize += segment.onDiskSize();
+ segmentsToRecycle.add(segment);
+ if (flushingSize + unused >= 0)
+ break;
+ }
+ flushDataFrom(segmentsToRecycle, false);
+ }
+ }
+
+
+ /**
+ * Allocate a segment within this CLSM. Should either succeed or throw.
+ */
+ public abstract Allocation allocate(Mutation mutation, int size);
+
+ /**
+ * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM
+ * decide what to do with those segments on disk after they've been replayed.
+ */
+ abstract void handleReplayedSegment(final File file);
+
+ /**
+ * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit
+ * to segment manager so it's performed on segment management thread.
+ */
+ abstract CommitLogSegment createSegment();
+
+ /**
+ * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment
+ * manager so it's performend on segment management thread, or perform while segment management thread is shutdown
+ * during testing resets.
+ *
+ * @param segment segment to be discarded
+ * @param delete whether or not the segment is safe to be deleted.
+ */
+ abstract void discard(CommitLogSegment segment, boolean delete);
+
+ /**
+ * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided.
+ *
+ * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
+ */
+ @DontInline
+ void advanceAllocatingFrom(CommitLogSegment old)
+ {
+ while (true)
+ {
+ synchronized (this)
+ {
+ // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments
+ if (allocatingFrom != old)
+ return;
+
+ // If a segment is ready, take it now, otherwise wait for the management thread to construct it.
+ if (availableSegment != null)
+ {
+ // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving
+ // the critical section.
+ activeSegments.add(allocatingFrom = availableSegment);
+ availableSegment = null;
+ break;
+ }
+ }
+
+ awaitAvailableSegment(old);
+ }
+
+ // Signal the management thread to prepare a new segment.
+ wakeManager();
+
+ if (old != null)
+ {
+ // Now we can run the user defined command just after switching to the new commit log.
+ // (Do this here instead of in the recycle call so we can get a head start on the archive.)
+ commitLog.archiver.maybeArchive(old);
+
+ // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it
+ old.discardUnusedTail();
+ }
+
+ // request that the CL be synced out-of-band, as we've finished a segment
+ commitLog.requestExtraSync();
+ }
+
+ void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom)
+ {
+ do
+ {
+ WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time());
+ if (availableSegment == null && allocatingFrom == currentAllocatingFrom)
+ prepared.awaitUninterruptibly();
+ else
+ prepared.cancel();
+ }
+ while (availableSegment == null && allocatingFrom == currentAllocatingFrom);
+ }
+
+ /**
+ * Switch to a new segment, regardless of how much is left in the current one.
+ *
+ * Flushes any dirty CFs for this segment and any older segments, and then discards the segments
+ */
+ void forceRecycleAll(Iterable<UUID> droppedCfs)
+ {
+ List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments);
+ CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1);
+ advanceAllocatingFrom(last);
+
+ // wait for the commit log modifications
+ last.waitForModifications();
+
+ // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes
+ // to complete
+ Keyspace.writeOrder.awaitNewBarrier();
+
+ // flush and wait for all CFs that are dirty in segments up-to and including 'last'
+ Future<?> future = flushDataFrom(segmentsToRecycle, true);
+ try
+ {
+ future.get();
+
+ for (CommitLogSegment segment : activeSegments)
+ for (UUID cfId : droppedCfs)
+ segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+
+ // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments()
+ // if the previous active segment was the only one to recycle (since an active segment isn't
+ // necessarily dirty, and we only call dCS after a flush).
+ for (CommitLogSegment segment : activeSegments)
+ {
+ if (segment.isUnused())
+ archiveAndDiscard(segment);
+ }
+
+ CommitLogSegment first;
+ if ((first = activeSegments.peek()) != null && first.id <= last.id)
+ logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs.");
+ }
+ catch (Throwable t)
+ {
+ // for now just log the error
+ logger.error("Failed waiting for a forced recycle of in-use commit log segments", t);
+ }
+ }
+
+ /**
+ * Indicates that a segment is no longer in use and that it should be discarded.
+ *
+ * @param segment segment that is no longer in use
+ */
+ void archiveAndDiscard(final CommitLogSegment segment)
+ {
+ boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName());
+ if (!activeSegments.remove(segment))
+ return; // already discarded
+ // if archiving (command) was not successful then leave the file alone. don't delete or recycle.
+ logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script");
+ discard(segment, archiveSuccess);
+ }
+
+ /**
+ * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
+ * @param addedSize
+ */
+ void addSize(long addedSize)
+ {
+ size.addAndGet(addedSize);
+ }
+
+ /**
+ * @return the space (in bytes) used by all segment files.
+ */
+ public long onDiskSize()
+ {
+ return size.get();
+ }
+
+ private long unusedCapacity()
+ {
+ long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+ long currentSize = size.get();
+ logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total);
+ return total - currentSize;
+ }
+
+ /**
+ * Force a flush on all CFs that are still dirty in @param segments.
+ *
+ * @return a Future that will finish when all the flushes are complete.
+ */
+ private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force)
+ {
+ if (segments.isEmpty())
+ return Futures.immediateFuture(null);
+ final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition();
+
+ // a map of CfId -> forceFlush() to ensure we only queue one flush per cf
+ final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>();
+
+ for (CommitLogSegment segment : segments)
+ {
+ for (UUID dirtyCFId : segment.getDirtyCFIDs())
+ {
+ Pair<String,String> pair = Schema.instance.getCF(dirtyCFId);
+ if (pair == null)
+ {
+ // even though we remove the schema entry before a final flush when dropping a CF,
+ // it's still possible for a writer to race and finish his append after the flush.
+ logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId);
+ segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition());
+ }
+ else if (!flushes.containsKey(dirtyCFId))
+ {
+ String keyspace = pair.left;
+ final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId);
+ // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush,
+ // no deadlock possibility since switchLock removal
+ flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition));
+ }
+ }
+ }
+
+ return Futures.allAsList(flushes.values());
+ }
+
+ /**
+ * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS.
+ * Only call this after the AbstractCommitLogService is shut down.
+ */
+ public void stopUnsafe(boolean deleteSegments)
+ {
+ logger.debug("CLSM closing and clearing existing commit log segments...");
+
+ shutdown();
+ try
+ {
+ awaitTermination();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ for (CommitLogSegment segment : activeSegments)
+ closeAndDeleteSegmentUnsafe(segment, deleteSegments);
+ activeSegments.clear();
+
+ size.set(0L);
+
+ logger.trace("CLSM done with closing and clearing existing commit log segments.");
+ }
+
+ /**
+ * To be used by tests only. Not safe if mutation slots are being allocated concurrently.
+ */
+ void awaitManagementTasksCompletion()
+ {
+ if (availableSegment == null && !atSegmentBufferLimit())
+ {
+ awaitAvailableSegment(allocatingFrom);
+ }
+ }
+
+ /**
+ * Explicitly for use only during resets in unit testing.
+ */
+ private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
+ {
+ try
+ {
+ discard(segment, delete);
+ }
+ catch (AssertionError ignored)
+ {
+ // segment file does not exist
+ }
+ }
+
+ /**
+ * Initiates the shutdown process for the management thread.
+ */
+ public void shutdown()
+ {
+ assert !shutdown;
+ shutdown = true;
+
+ // Release the management thread and delete prepared segment.
+ // Do not block as another thread may claim the segment (this can happen during unit test initialization).
+ discardAvailableSegment();
+ wakeManager();
+ }
+
+ private void discardAvailableSegment()
+ {
+ CommitLogSegment next = null;
+ synchronized (this)
+ {
+ next = availableSegment;
+ availableSegment = null;
+ }
+ if (next != null)
+ next.discard(true);
+ }
+
+ /**
+ * Returns when the management thread terminates.
+ */
+ public void awaitTermination() throws InterruptedException
+ {
+ managerThread.join();
+ managerThread = null;
+
+ for (CommitLogSegment segment : activeSegments)
+ segment.close();
+
+ bufferPool.shutdown();
+ }
+
+ /**
+ * @return a read-only collection of the active commit log segments
+ */
+ @VisibleForTesting
+ public Collection<CommitLogSegment> getActiveSegments()
+ {
+ return Collections.unmodifiableCollection(activeSegments);
+ }
+
+ /**
+ * @return the current CommitLogPosition of the active segment we're allocating from
+ */
+ CommitLogPosition getCurrentPosition()
+ {
+ return allocatingFrom.getCurrentCommitLogPosition();
+ }
+
+ /**
+ * Forces a disk flush on the commit log files that need it. Blocking.
+ */
+ public void sync() throws IOException
+ {
+ CommitLogSegment current = allocatingFrom;
+ for (CommitLogSegment segment : getActiveSegments())
+ {
+ // Do not sync segments that became active after sync started.
+ if (segment.id > current.id)
+ return;
+ segment.sync();
+ }
+ }
+
+ /**
+ * Used by compressed and encrypted segments to share a buffer pool across the CLSM.
+ */
+ SimpleCachedBufferPool getBufferPool()
+ {
+ return bufferPool;
+ }
+
+ void wakeManager()
+ {
+ LockSupport.unpark(managerThread);
+ }
+
+ /**
+ * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for
+ * a buffer to become available.
+ */
+ void notifyBufferFreed()
+ {
+ wakeManager();
+ }
+
+ /** Read-only access to current segment for subclasses. */
+ CommitLogSegment allocatingFrom()
+ {
+ return allocatingFrom;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 7b56da3,e5a5887..834aa0d
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@@ -17,18 -17,16 +17,19 @@@
*/
package org.apache.cassandra.db.commitlog;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.utils.NoSpamLogger;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-import org.slf4j.*;
-
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.LockSupport;
-import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Timer.Context;
+
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
public abstract class AbstractCommitLogService
{
@@@ -148,8 -160,7 +149,8 @@@
}
};
+ shutdown = false;
- thread = new Thread(runnable, name);
+ thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/index/sasi/TermIterator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/sasi/TermIterator.java
index 5b08a56,0000000..1ddfcb9
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
@@@ -1,218 -1,0 +1,219 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.sasi;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
++import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
+import org.apache.cassandra.index.sasi.disk.Token;
+import org.apache.cassandra.index.sasi.plan.Expression;
+import org.apache.cassandra.index.sasi.utils.RangeUnionIterator;
+import org.apache.cassandra.index.sasi.utils.RangeIterator;
+import org.apache.cassandra.io.util.FileUtils;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TermIterator extends RangeIterator<Long, Token>
+{
+ private static final Logger logger = LoggerFactory.getLogger(TermIterator.class);
+
+ private static final ThreadLocal<ExecutorService> SEARCH_EXECUTOR = new ThreadLocal<ExecutorService>()
+ {
+ public ExecutorService initialValue()
+ {
+ final String currentThread = Thread.currentThread().getName();
+ final int concurrencyFactor = DatabaseDescriptor.searchConcurrencyFactor();
+
+ logger.info("Search Concurrency Factor is set to {} for {}", concurrencyFactor, currentThread);
+
+ return (concurrencyFactor <= 1)
+ ? MoreExecutors.newDirectExecutorService()
+ : Executors.newFixedThreadPool(concurrencyFactor, new ThreadFactory()
+ {
+ public final AtomicInteger count = new AtomicInteger();
+
+ public Thread newThread(Runnable task)
+ {
- return new Thread(task, currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
++ return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
+ }
+ });
+ }
+ };
+
+ private final Expression expression;
+
+ private final RangeIterator<Long, Token> union;
+ private final Set<SSTableIndex> referencedIndexes;
+
+ private TermIterator(Expression e,
+ RangeIterator<Long, Token> union,
+ Set<SSTableIndex> referencedIndexes)
+ {
+ super(union.getMinimum(), union.getMaximum(), union.getCount());
+
+ this.expression = e;
+ this.union = union;
+ this.referencedIndexes = referencedIndexes;
+ }
+
+ @SuppressWarnings("resource")
+ public static TermIterator build(final Expression e, Set<SSTableIndex> perSSTableIndexes)
+ {
+ final List<RangeIterator<Long, Token>> tokens = new CopyOnWriteArrayList<>();
+ final AtomicLong tokenCount = new AtomicLong(0);
+
+ RangeIterator<Long, Token> memtableIterator = e.index.searchMemtable(e);
+ if (memtableIterator != null)
+ {
+ tokens.add(memtableIterator);
+ tokenCount.addAndGet(memtableIterator.getCount());
+ }
+
+ final Set<SSTableIndex> referencedIndexes = new CopyOnWriteArraySet<>();
+
+ try
+ {
+ final CountDownLatch latch = new CountDownLatch(perSSTableIndexes.size());
+ final ExecutorService searchExecutor = SEARCH_EXECUTOR.get();
+
+ for (final SSTableIndex index : perSSTableIndexes)
+ {
+ if (e.getOp() == Expression.Op.PREFIX &&
+ index.mode() == OnDiskIndexBuilder.Mode.CONTAINS && !index.hasMarkedPartials())
+ throw new UnsupportedOperationException(String.format("The index %s has not yet been upgraded " +
+ "to support prefix queries in CONTAINS mode. " +
+ "Wait for compaction or rebuild the index.",
+ index.getPath()));
+
+
+ if (!index.reference())
+ {
+ latch.countDown();
+ continue;
+ }
+
+ // add to referenced right after the reference was acquired,
+ // that helps to release index if something goes bad inside of the search
+ referencedIndexes.add(index);
+
+ searchExecutor.submit((Runnable) () -> {
+ try
+ {
+ e.checkpoint();
+
+ RangeIterator<Long, Token> keyIterator = index.search(e);
+ if (keyIterator == null)
+ {
+ releaseIndex(referencedIndexes, index);
+ return;
+ }
+
+ tokens.add(keyIterator);
+ tokenCount.getAndAdd(keyIterator.getCount());
+ }
+ catch (Throwable e1)
+ {
+ releaseIndex(referencedIndexes, index);
+
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Failed search an index %s, skipping.", index.getPath()), e1);
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ });
+ }
+
+ Uninterruptibles.awaitUninterruptibly(latch);
+
+ // checkpoint right away after all indexes complete search because we might have crossed the quota
+ e.checkpoint();
+
+ RangeIterator<Long, Token> ranges = RangeUnionIterator.build(tokens);
+ return ranges == null ? null : new TermIterator(e, ranges, referencedIndexes);
+ }
+ catch (Throwable ex)
+ {
+ // if execution quota was exceeded while opening indexes or something else happened
+ // local (yet to be tracked) indexes should be released first before re-throwing exception
+ referencedIndexes.forEach(TermIterator::releaseQuietly);
+
+ throw ex;
+ }
+ }
+
+ protected Token computeNext()
+ {
+ try
+ {
+ return union.hasNext() ? union.next() : endOfData();
+ }
+ finally
+ {
+ expression.checkpoint();
+ }
+ }
+
+ protected void performSkipTo(Long nextToken)
+ {
+ try
+ {
+ union.skipTo(nextToken);
+ }
+ finally
+ {
+ expression.checkpoint();
+ }
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(union);
+ referencedIndexes.forEach(TermIterator::releaseQuietly);
+ referencedIndexes.clear();
+ }
+
+ private static void releaseIndex(Set<SSTableIndex> indexes, SSTableIndex index)
+ {
+ indexes.remove(index);
+ releaseQuietly(index);
+ }
+
+ private static void releaseQuietly(SSTableIndex index)
+ {
+ try
+ {
+ index.release();
+ }
+ catch (Throwable e)
+ {
+ logger.error(String.format("Failed to release index %s", index.getPath()), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1f47334,a9dfcdc..1843e7b
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -502,31 -506,27 +503,28 @@@ public class OutboundTcpConnection exte
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
- new Thread("HANDSHAKE-" + poolReference.endPoint())
- new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
++ Runnable r = () ->
{
- @Override
- public void run()
+ try
{
- try
- {
- logger.info("Handshaking version with {}", poolReference.endPoint());
- version.set(inputStream.readInt());
- }
- catch (IOException ex)
- {
- final String msg = "Cannot handshake version with " + poolReference.endPoint();
- if (logger.isTraceEnabled())
- logger.trace(msg, ex);
- else
- logger.info(msg);
- }
- finally
- {
- //unblock the waiting thread on either success or fail
- versionLatch.countDown();
- }
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
}
- }.start();
- }),"HANDSHAKE-" + poolReference.endPoint()).start();
++ };
++ new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe,61dfa50..904deb3
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@@ -59,17 -60,17 +60,14 @@@ public class RoundRobinScheduler implem
taskCount = new Semaphore(options.throttle_limit - 1);
queues = new NonBlockingHashMap<String, WeightedQueue>();
-- Runnable runnable = new Runnable()
++ Runnable runnable = () ->
{
-- public void run()
++ while (true)
{
-- while (true)
-- {
-- schedule();
-- }
++ schedule();
}
};
- Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+ Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
scheduler.start();
logger.info("Started the RoundRobin Request Scheduler");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/50a9b1ab/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 3f999c2,71cbc35..1247e03
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -631,8 -592,29 +632,8 @@@ public class StorageService extends Not
throw new AssertionError(e);
}
- if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
- {
- logger.info("Loading persisted ring state");
- Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
- Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
- for (InetAddress ep : loadedTokens.keySet())
- {
- if (ep.equals(FBUtilities.getBroadcastAddress()))
- {
- // entry has been mistakenly added, delete it
- SystemKeyspace.removeEndpoint(ep);
- }
- else
- {
- if (loadedHostIds.containsKey(ep))
- tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
- Gossiper.instance.addSavedEndpoint(ep);
- }
- }
- }
-
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
- drainOnShutdown = new Thread(new WrappedRunnable()
+ drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@@ -647,10 -629,10 +648,10 @@@
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
logbackHook.run();
}
- }, "StorageServiceShutdownHook");
+ }), "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
- replacing = DatabaseDescriptor.isReplacing();
+ replacing = isReplacing();
if (!Boolean.parseBoolean(System.getProperty("cassandra.start_gossip", "true")))
{
[04/10] cassandra git commit: Thread local pools never cleaned up
Posted by sn...@apache.org.
Thread local pools never cleaned up
patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13033
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f668c6f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f668c6f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f668c6f
Branch: refs/heads/trunk
Commit: 7f668c6fe117f892cd79863fb9805ea5d5a2823c
Parents: da94781
Author: Robert Stupp <sn...@snazy.de>
Authored: Mon Dec 12 20:28:31 2016 +0100
Committer: Robert Stupp <sn...@snazy.de>
Committed: Mon Dec 12 20:38:56 2016 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../concurrent/NamedThreadFactory.java | 24 +++++++++++-
.../db/commitlog/AbstractCommitLogService.java | 3 +-
.../db/commitlog/CommitLogSegmentManager.java | 3 +-
.../cassandra/net/OutboundTcpConnection.java | 41 +++++++++-----------
.../apache/cassandra/repair/RepairRunnable.java | 4 +-
.../scheduler/RoundRobinScheduler.java | 3 +-
.../cassandra/service/StorageService.java | 7 ++--
.../cassandra/streaming/ConnectionHandler.java | 4 +-
.../compress/CompressedInputStream.java | 3 +-
10 files changed, 59 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index bbd47c1..5bc30be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Thread local pools never cleaned up (CASSANDRA-13033)
* Set RPC_READY to false when draining or if a node is marked as shutdown (CASSANDRA-12781)
* CQL often queries static columns unnecessarily (CASSANDRA-12768)
* Make sure sstables only get committed when it's safe to discard commit log records (CASSANDRA-12956)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 33c80d5..22193c4 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+
/**
* This class is an implementation of the <i>ThreadFactory</i> interface. This
* is useful to give Java threads meaningful names which is useful when using
@@ -54,12 +57,29 @@ public class NamedThreadFactory implements ThreadFactory
public Thread newThread(Runnable runnable)
{
- String name = id + ":" + n.getAndIncrement();
- Thread thread = new Thread(threadGroup, runnable, name);
+ String name = id + ':' + n.getAndIncrement();
+ Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
thread.setPriority(priority);
thread.setDaemon(true);
if (contextClassLoader != null)
thread.setContextClassLoader(contextClassLoader);
return thread;
}
+
+ /**
+ * Ensures that {@link FastThreadLocal#remove() FastThreadLocal.remove()} is called when the {@link Runnable#run()}
+ * method of the given {@link Runnable} instance completes to ensure cleanup of {@link FastThreadLocal} instances.
+ * This is especially important for direct byte buffers allocated locally for a thread.
+ */
+ public static Runnable threadLocalDeallocator(Runnable r)
+ {
+ return () ->
+ {
+ try {
+ r.run();
+ } finally {
+ FastThreadLocal.removeAll();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 113d1ba..e5a5887 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db.commitlog;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.slf4j.*;
@@ -159,7 +160,7 @@ public abstract class AbstractCommitLogService
}
};
- thread = new Thread(runnable, name);
+ thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 82cee50..79dd316 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -32,6 +32,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -174,7 +175,7 @@ public class CommitLogSegmentManager
run = true;
- managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR");
+ managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
managerThread.start();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index f573787..a9dfcdc 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -45,6 +45,7 @@ import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.xxhash.XXHashFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -505,31 +506,27 @@ public class OutboundTcpConnection extends Thread
{
final AtomicInteger version = new AtomicInteger(NO_VERSION);
final CountDownLatch versionLatch = new CountDownLatch(1);
- new Thread("HANDSHAKE-" + poolReference.endPoint())
+ new Thread(NamedThreadFactory.threadLocalDeallocator(() ->
{
- @Override
- public void run()
+ try
{
- try
- {
- logger.info("Handshaking version with {}", poolReference.endPoint());
- version.set(inputStream.readInt());
- }
- catch (IOException ex)
- {
- final String msg = "Cannot handshake version with " + poolReference.endPoint();
- if (logger.isTraceEnabled())
- logger.trace(msg, ex);
- else
- logger.info(msg);
- }
- finally
- {
- //unblock the waiting thread on either success or fail
- versionLatch.countDown();
- }
+ logger.info("Handshaking version with {}", poolReference.endPoint());
+ version.set(inputStream.readInt());
+ }
+ catch (IOException ex)
+ {
+ final String msg = "Cannot handshake version with " + poolReference.endPoint();
+ if (logger.isTraceEnabled())
+ logger.trace(msg, ex);
+ else
+ logger.info(msg);
+ }
+ finally
+ {
+ //unblock the waiting thread on either success or fail
+ versionLatch.countDown();
}
- }.start();
+ }),"HANDSHAKE-" + poolReference.endPoint()).start();
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 7dd1b31..213e5c5 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -373,7 +373,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
private Thread createQueryThread(final int cmd, final UUID sessionId)
{
- return new Thread(new WrappedRunnable()
+ return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
// Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
// Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
@@ -440,6 +440,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
seen[si].clear();
}
}
- });
+ }));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index c98c0fe..61dfa50 100644
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.RequestSchedulerOptions;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -69,7 +70,7 @@ public class RoundRobinScheduler implements IRequestScheduler
}
}
};
- Thread scheduler = new Thread(runnable, "REQUEST-SCHEDULER");
+ Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
scheduler.start();
logger.info("Started the RoundRobin Request Scheduler");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d70c8dc..71cbc35 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.auth.AuthMigrationListener;
import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
@@ -613,7 +614,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
// daemon threads, like our executors', continue to run while shutdown hooks are invoked
- drainOnShutdown = new Thread(new WrappedRunnable()
+ drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
{
@Override
public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@ -628,7 +629,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
logbackHook.run();
}
- }, "StorageServiceShutdownHook");
+ }), "StorageServiceShutdownHook");
Runtime.getRuntime().addShutdownHook(drainOnShutdown);
replacing = DatabaseDescriptor.isReplacing();
@@ -3195,7 +3196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return 0;
int cmd = nextRepairCommand.incrementAndGet();
- new Thread(createRepairTask(cmd, keyspace, options, legacy)).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start();
return cmd;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
index d3d8ed2..b83c089 100644
--- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
+++ b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
@@ -214,7 +216,7 @@ public class ConnectionHandler
this.socket = socket;
this.protocolVersion = protocolVersion;
- new Thread(this, name() + "-" + session.peer).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(this), name() + "-" + session.peer).start();
}
public ListenableFuture<?> close()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f668c6f/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index d59849f..6577980 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@ -33,6 +33,7 @@ import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -91,7 +92,7 @@ public class CompressedInputStream extends InputStream
this.dataBuffer = new ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
this.crcCheckChanceSupplier = crcCheckChanceSupplier;
- new Thread(new Reader(source, info, dataBuffer)).start();
+ new Thread(NamedThreadFactory.threadLocalDeallocator(new Reader(source, info, dataBuffer))).start();
}
public int read() throws IOException