You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/11/03 20:07:16 UTC
[cassandra] branch trunk updated: commit log was switched from
non-daemon to daemon threads,
which causes the JVM to exit in some case as no non-daemon threads are
active
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 111e94a commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active
111e94a is described below
commit 111e94ae13381ede97de190d9e1af9a77cac2b21
Author: David Capwell <dc...@apache.org>
AuthorDate: Wed Nov 3 11:53:36 2021 -0700
commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active
patch by David Capwell, Sam Tunnicliffe; reviewed by Sam Tunnicliffe for CASSANDRA-17085
---
CHANGES.txt | 1 +
.../cassandra/concurrent/ExecutorFactory.java | 59 ++++++++++++--------
.../cassandra/concurrent/InfiniteLoopExecutor.java | 33 ++++++++----
.../commitlog/AbstractCommitLogSegmentManager.java | 63 +++++++++++-----------
.../db/commitlog/AbstractCommitLogService.java | 37 ++++++++-----
.../org/apache/cassandra/utils/concurrent/Ref.java | 4 +-
.../apache/cassandra/utils/memory/BufferPool.java | 3 +-
.../utils/memory/MemtableCleanerThread.java | 3 +-
.../concurrent/InfiniteLoopExecutorTest.java | 6 ++-
9 files changed, 123 insertions(+), 86 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 32698f8..d1f5e73 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085)
* Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106)
* v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054)
* Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java b/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
index f1acd55..83f48d1 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
@@ -18,12 +18,15 @@
package org.apache.cassandra.concurrent;
-import java.util.function.Consumer;
-
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Shared;
import static java.lang.Thread.*;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.DAEMON;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED;
import static org.apache.cassandra.concurrent.NamedThreadFactory.createThread;
import static org.apache.cassandra.concurrent.NamedThreadFactory.setupThread;
import static org.apache.cassandra.concurrent.ThreadPoolExecutorBuilder.pooledJmx;
@@ -99,20 +102,21 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
* Create and start a new thread to execute {@code runnable}
* @param name the name of the thread
* @param runnable the task to execute
+ * @param daemon flag to indicate whether the thread should be a daemon or not
* @return the new thread
*/
- Thread startThread(String name, Runnable runnable);
+ Thread startThread(String name, Runnable runnable, Daemon daemon);
/**
- * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}.
- * On shutdown, the executing thread will be interrupted; to support clean shutdown
- * {@code runnable} should propagate {@link InterruptedException}
- *
- * @param name the name of the thread used to invoke the task repeatedly
- * @param task the task to execute repeatedly
+ * Create and start a new thread to execute {@code runnable}; this thread will be a daemon thread.
+ * @param name the name of the thread
+ * @param runnable the task to execute
* @return the new thread
*/
- Interruptible infiniteLoop(String name, Interruptible.Task task, boolean simulatorSafe);
+ default Thread startThread(String name, Runnable runnable)
+ {
+ return startThread(name, runnable, DAEMON);
+ }
/**
* Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}.
@@ -121,10 +125,15 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
*
* @param name the name of the thread used to invoke the task repeatedly
* @param task the task to execute repeatedly
- * @param interruptHandler perform specific processing of interrupts of the task execution thread
+ * @param simulatorSafe flag indicating if the loop thread can be intercepted / rescheduled during cluster simulation
+ * @param daemon flag to indicate whether the loop thread should be a daemon thread or not
+ * @param interrupts flag to indicate whether to synchronize interrupts of the task execution thread
+ * using the task's monitor this can be used to prevent interruption while performing
+ * IO operations which forbid interrupted threads.
+ * See: {@link org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager::start}
* @return the new thread
*/
- Interruptible infiniteLoop(String name, Interruptible.Task task, boolean simulatorSafe, Consumer<Thread> interruptHandler);
+ Interruptible infiniteLoop(String name, Interruptible.Task task, SimulatorSafe simulatorSafe, Daemon daemon, Interrupts interrupts);
/**
* Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}.
@@ -133,11 +142,12 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
*
* @param name the name of the thread used to invoke the task repeatedly
* @param task the task to execute repeatedly
+ * @param simulatorSafe flag indicating if the loop thread can be intercepted / rescheduled during cluster simulation
* @return the new thread
*/
- default Interruptible infiniteLoop(String name, Interruptible.SimpleTask task, boolean simulatorSafe)
+ default Interruptible infiniteLoop(String name, Interruptible.SimpleTask task, SimulatorSafe simulatorSafe)
{
- return infiniteLoop(name, Interruptible.Task.from(task), simulatorSafe);
+ return infiniteLoop(name, Interruptible.Task.from(task), simulatorSafe, DAEMON, UNSYNCHRONIZED);
}
/**
@@ -169,6 +179,7 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
super(contextClassLoader, threadGroup, uncaughtExceptionHandler);
}
+ @Override
public LocalAwareSubFactory localAware()
{
return new LocalAwareSubFactory()
@@ -225,16 +236,19 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
};
}
+ @Override
public ExecutorBuilder<SingleThreadExecutorPlus> configureSequential(String name)
{
return ThreadPoolExecutorBuilder.sequential(SingleThreadExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name);
}
+ @Override
public ExecutorBuilder<ThreadPoolExecutorPlus> configurePooled(String name, int threads)
{
return ThreadPoolExecutorBuilder.pooled(ThreadPoolExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads);
}
+ @Override
public ScheduledExecutorPlus scheduled(boolean executeOnShutdown, String name, int priority)
{
ScheduledThreadPoolExecutorPlus executor = new ScheduledThreadPoolExecutorPlus(newThreadFactory(name, priority));
@@ -243,22 +257,21 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
return executor;
}
- public Thread startThread(String name, Runnable runnable)
+ @Override
+ public Thread startThread(String name, Runnable runnable, Daemon daemon)
{
- Thread thread = setupThread(createThread(threadGroup, runnable, name, true), Thread.NORM_PRIORITY, contextClassLoader, uncaughtExceptionHandler);
+ Thread thread = setupThread(createThread(threadGroup, runnable, name, daemon == DAEMON),
+ Thread.NORM_PRIORITY,
+ contextClassLoader,
+ uncaughtExceptionHandler);
thread.start();
return thread;
}
- public Interruptible infiniteLoop(String name, Interruptible.Task task, boolean simulatorSafe)
- {
- return new InfiniteLoopExecutor(this, name, task);
- }
-
@Override
- public Interruptible infiniteLoop(String name, Interruptible.Task task, boolean simulatorSafe, Consumer<Thread> interruptHandler)
+ public Interruptible infiniteLoop(String name, Interruptible.Task task, SimulatorSafe simulatorSafe, Daemon daemon, Interrupts interrupts)
{
- return new InfiniteLoopExecutor(this, name, task, interruptHandler);
+ return new InfiniteLoopExecutor(this, name, task, daemon, interrupts);
}
@Override
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
index 4012970..6ef439a 100644
--- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -31,6 +31,8 @@ import java.util.function.Consumer;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.InternalState.TERMINATED;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SYNCHRONIZED;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED;
import static org.apache.cassandra.concurrent.Interruptible.State.INTERRUPTED;
import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
@@ -40,6 +42,9 @@ public class InfiniteLoopExecutor implements Interruptible
private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
public enum InternalState { TERMINATED }
+ public enum SimulatorSafe { SAFE, UNSAFE }
+ public enum Daemon { DAEMON, NON_DAEMON }
+ public enum Interrupts { SYNCHRONIZED, UNSYNCHRONIZED }
private static final AtomicReferenceFieldUpdater<InfiniteLoopExecutor, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(InfiniteLoopExecutor.class, Object.class, "state");
private final Thread thread;
@@ -47,30 +52,36 @@ public class InfiniteLoopExecutor implements Interruptible
private volatile Object state = NORMAL;
private final Consumer<Thread> interruptHandler;
- public InfiniteLoopExecutor(String name, Task task)
+ public InfiniteLoopExecutor(String name, Task task, Daemon daemon)
{
- this(ExecutorFactory.Global.executorFactory(), name, task, Thread::interrupt);
+ this(ExecutorFactory.Global.executorFactory(), name, task, daemon, UNSYNCHRONIZED);
}
- public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task)
+ public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Daemon daemon)
{
- this(factory, name, task, Thread::interrupt);
+ this(factory, name, task, daemon, UNSYNCHRONIZED);
}
- public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Consumer<Thread> interruptHandler)
+ public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Daemon daemon, Interrupts interrupts)
{
this.task = task;
- this.thread = factory.startThread(name, this::loop);
- this.interruptHandler = interruptHandler;
+ this.thread = factory.startThread(name, this::loop, daemon);
+ this.interruptHandler = interrupts == SYNCHRONIZED
+ ? interruptHandler(task)
+ : Thread::interrupt;
}
- public InfiniteLoopExecutor(BiFunction<String, Runnable, Thread> threadStarter, String name, Task task, Consumer<Thread> interruptHandler)
+ private static Consumer<Thread> interruptHandler(final Object monitor)
{
- this.task = task;
- this.thread = threadStarter.apply(name, this::loop);
- this.interruptHandler = interruptHandler;
+ return thread -> {
+ synchronized (monitor)
+ {
+ thread.interrupt();
+ }
+ };
}
+
private void loop()
{
boolean interrupted = false;
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 0c438d5..485e3fd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
-import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -47,6 +46,9 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.concurrent.*;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.NON_DAEMON;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SYNCHRONIZED;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
@@ -106,11 +108,29 @@ public abstract class AbstractCommitLogSegmentManager
void start()
{
- // used for synchronization to prevent thread interrupts while performing IO operations
- final Object monitor = new Object();
- // The run loop for the manager thread
- Interruptible.Task runnable = state -> {
+ // For encrypted segments we want to keep the compression buffers on-heap as we need those bytes for encryption,
+ // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+ BufferType bufferType = commitLog.configuration.useEncryption() || !commitLog.configuration.useCompression()
+ ? BufferType.ON_HEAP
+ : commitLog.configuration.getCompressor().preferredBufferType();
+
+ this.bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(),
+ DatabaseDescriptor.getCommitLogSegmentSize(),
+ bufferType);
+
+ AllocatorRunnable allocator = new AllocatorRunnable();
+ executor = executorFactory().infiniteLoop("COMMIT-LOG-ALLOCATOR", allocator, SAFE, NON_DAEMON, SYNCHRONIZED);
+ // for simplicity, ensure the first segment is allocated before continuing
+ advanceAllocatingFrom(null);
+ }
+
+ class AllocatorRunnable implements Interruptible.Task
+ {
+ // The run loop for the manager thread
+ @Override
+ public void run(Interruptible.State state) throws InterruptedException
+ {
try
{
switch (state)
@@ -123,9 +143,12 @@ public abstract class AbstractCommitLogSegmentManager
case NORMAL:
assert availableSegment == null;
+ // synchronized to prevent thread interrupts while performing IO operations and also
+ // clear interrupted status to prevent ClosedByInterruptException in createSegment
- synchronized (monitor)
+ synchronized (this)
{
+ Thread.interrupted();
logger.trace("No segments in reserve; creating a fresh one");
availableSegment = createSegment();
@@ -155,33 +178,7 @@ public abstract class AbstractCommitLogSegmentManager
// shutting down-- nothing more can or needs to be done in that case.
}
WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue);
- };
-
- // For encrypted segments we want to keep the compression buffers on-heap as we need those bytes for encryption,
- // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
- BufferType bufferType = commitLog.configuration.useEncryption() || !commitLog.configuration.useCompression()
- ? BufferType.ON_HEAP
- : commitLog.configuration.getCompressor().preferredBufferType();
-
- this.bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(),
- DatabaseDescriptor.getCommitLogSegmentSize(),
- bufferType);
-
- Consumer<Thread> interruptHandler = interruptHandler(monitor);
- executor = executorFactory().infiniteLoop("COMMIT-LOG-ALLOCATOR", runnable, true, interruptHandler);
-
- // for simplicity, ensure the first segment is allocated before continuing
- advanceAllocatingFrom(null);
- }
-
- private Consumer<Thread> interruptHandler(final Object monitor)
- {
- return thread -> {
- synchronized (monitor)
- {
- thread.interrupt();
- }
- };
+ }
}
private boolean atSegmentBufferLimit()
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 9d030d7..be3f8cd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -36,7 +36,11 @@ import static com.codahale.metrics.Timer.Context;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.NON_DAEMON;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SYNCHRONIZED;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
@@ -147,7 +151,8 @@ public abstract class AbstractCommitLogService
throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
syncIntervalNanos * 1e-6));
- executor = executorFactory().infiniteLoop(name, new SyncRunnable(MonotonicClock.preciseTime), true);
+ SyncRunnable sync = new SyncRunnable(MonotonicClock.preciseTime);
+ executor = executorFactory().infiniteLoop(name, sync, SAFE, NON_DAEMON, SYNCHRONIZED);
}
class SyncRunnable implements Interruptible.Task
@@ -171,19 +176,25 @@ public abstract class AbstractCommitLogService
// sync and signal
long pollStarted = clock.now();
boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= pollStarted || state != NORMAL || syncRequested;
- if (flushToDisk)
+ // synchronized to prevent thread interrupts while performing IO operations and also
+ // clear interrupted status to prevent ClosedByInterruptException in CommitLog::sync
+ synchronized (this)
{
- // in this branch, we want to flush the commit log to disk
- syncRequested = false;
- commitLog.sync(true);
- lastSyncedAt = pollStarted;
- syncComplete.signalAll();
- syncCount++;
- }
- else
- {
- // in this branch, just update the commit log sync headers
- commitLog.sync(false);
+ Thread.interrupted();
+ if (flushToDisk)
+ {
+ // in this branch, we want to flush the commit log to disk
+ syncRequested = false;
+ commitLog.sync(true);
+ lastSyncedAt = pollStarted;
+ syncComplete.signalAll();
+ syncCount++;
+ }
+ else
+ {
+ // in this branch, just update the commit log sync headers
+ commitLog.sync(false);
+ }
}
if (state == SHUTTING_DOWN)
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 26644dc..c077467 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -31,7 +31,6 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +52,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap;
import static java.util.Collections.emptyList;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -357,7 +357,7 @@ public final class Ref<T> implements RefCounted<T>
static final Set<Class<?>> concurrentIterables = Collections.newSetFromMap(new IdentityHashMap<>());
private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<>());
static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
- private static final Shutdownable EXEC = executorFactory().infiniteLoop("Reference-Reaper", Ref::reapOneReference, false);
+ private static final Shutdownable EXEC = executorFactory().infiniteLoop("Reference-Reaper", Ref::reapOneReference, UNSAFE);
static final ScheduledExecutorService STRONG_LEAK_DETECTOR = !DEBUG_ENABLED ? null : executorFactory().scheduled("Strong-Reference-Leak-Detector");
static
{
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index 7c1e95e..d656616 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.utils.concurrent.Ref;
import static com.google.common.collect.ImmutableList.of;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
import static org.apache.cassandra.utils.ExecutorUtils.*;
import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect;
@@ -173,7 +174,7 @@ public class BufferPool
this.globalPool = new GlobalPool();
this.metrics = new BufferPoolMetrics(name, this);
this.recyclePartially = recyclePartially;
- this.localPoolCleaner = executorFactory().infiniteLoop("LocalPool-Cleaner-" + name, this::cleanupOneReference, false);
+ this.localPoolCleaner = executorFactory().infiniteLoop("LocalPool-Cleaner-" + name, this::cleanupOneReference, UNSAFE);
}
/**
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index e11ce99..dbc23e5 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.cassandra.concurrent.Interruptible;
import org.apache.cassandra.utils.concurrent.WaitQueue;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
@@ -116,7 +117,7 @@ public class MemtableCleanerThread<P extends MemtablePool> implements Interrupti
private MemtableCleanerThread(Clean<P> clean)
{
- this.executor = executorFactory().infiniteLoop(clean.pool.getClass().getSimpleName() + "Cleaner", clean, true);
+ this.executor = executorFactory().infiniteLoop(clean.pool.getClass().getSimpleName() + "Cleaner", clean, SAFE);
this.trigger = clean.wait::signal;
this.clean = clean;
}
diff --git a/test/unit/org/apache/cassandra/concurrent/InfiniteLoopExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/InfiniteLoopExecutorTest.java
index 73dca7e..9ec702d 100644
--- a/test/unit/org/apache/cassandra/concurrent/InfiniteLoopExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/InfiniteLoopExecutorTest.java
@@ -30,13 +30,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Test;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.DAEMON;
+
public class InfiniteLoopExecutorTest
{
@Test
public void testShutdownNow() throws InterruptedException, ExecutionException, TimeoutException
{
Semaphore semaphore = new Semaphore(0);
- InfiniteLoopExecutor e1 = new InfiniteLoopExecutor("test", ignore -> semaphore.acquire(1));
+ InfiniteLoopExecutor e1 = new InfiniteLoopExecutor("test", ignore -> semaphore.acquire(1), DAEMON);
ExecutorService exec = Executors.newCachedThreadPool();
Future<?> f = exec.submit(() -> e1.awaitTermination(1L, TimeUnit.MINUTES));
e1.shutdownNow();
@@ -53,7 +55,7 @@ public class InfiniteLoopExecutorTest
semaphore.acquire(1);
active.set(false);
semaphore.release();
- });
+ }, DAEMON);
ExecutorService exec = Executors.newCachedThreadPool();
Future<?> f = exec.submit(() -> e1.awaitTermination(1L, TimeUnit.MINUTES));
// do ten normal loops
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org