You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2021/11/03 20:07:16 UTC

[cassandra] branch trunk updated: commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 111e94a  commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active
111e94a is described below

commit 111e94ae13381ede97de190d9e1af9a77cac2b21
Author: David Capwell <dc...@apache.org>
AuthorDate: Wed Nov 3 11:53:36 2021 -0700

    commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active
    
    patch by David Capwell, Sam Tunnicliffe; reviewed by Sam Tunnicliffe for CASSANDRA-17085
---
 CHANGES.txt                                        |  1 +
 .../cassandra/concurrent/ExecutorFactory.java      | 59 ++++++++++++--------
 .../cassandra/concurrent/InfiniteLoopExecutor.java | 33 ++++++++----
 .../commitlog/AbstractCommitLogSegmentManager.java | 63 +++++++++++-----------
 .../db/commitlog/AbstractCommitLogService.java     | 37 ++++++++-----
 .../org/apache/cassandra/utils/concurrent/Ref.java |  4 +-
 .../apache/cassandra/utils/memory/BufferPool.java  |  3 +-
 .../utils/memory/MemtableCleanerThread.java        |  3 +-
 .../concurrent/InfiniteLoopExecutorTest.java       |  6 ++-
 9 files changed, 123 insertions(+), 86 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 32698f8..d1f5e73 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * commit log was switched from non-daemon to daemon threads, which causes the JVM to exit in some case as no non-daemon threads are active (CASSANDRA-17085)
  * Add a Denylist to block reads and writes on specific partition keys (CASSANDRA-12106)
  * v4+ protocol did not clean up client warnings, which caused leaking the state (CASSANDRA-17054)
  * Remove duplicate toCQLString in ReadCommand (CASSANDRA-17023)
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java b/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
index f1acd55..83f48d1 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutorFactory.java
@@ -18,12 +18,15 @@
 
 package org.apache.cassandra.concurrent;
 
-import java.util.function.Consumer;
-
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts;
+import org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Shared;
 
 import static java.lang.Thread.*;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.DAEMON;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED;
 import static org.apache.cassandra.concurrent.NamedThreadFactory.createThread;
 import static org.apache.cassandra.concurrent.NamedThreadFactory.setupThread;
 import static org.apache.cassandra.concurrent.ThreadPoolExecutorBuilder.pooledJmx;
@@ -99,20 +102,21 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
      * Create and start a new thread to execute {@code runnable}
      * @param name the name of the thread
      * @param runnable the task to execute
+     * @param daemon flag to indicate whether the thread should be a daemon or not
      * @return the new thread
      */
-    Thread startThread(String name, Runnable runnable);
+    Thread startThread(String name, Runnable runnable, Daemon daemon);
 
     /**
-     * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}.
-     * On shutdown, the executing thread will be interrupted; to support clean shutdown
-     * {@code runnable} should propagate {@link InterruptedException}
-     *
-     * @param name the name of the thread used to invoke the task repeatedly
-     * @param task the task to execute repeatedly
+     * Create and start a new thread to execute {@code runnable}; this thread will be a daemon thread.
+     * @param name the name of the thread
+     * @param runnable the task to execute
      * @return the new thread
      */
-    Interruptible infiniteLoop(String name, Interruptible.Task task, boolean simulatorSafe);
+    default Thread startThread(String name, Runnable runnable)
+    {
+        return startThread(name, runnable, DAEMON);
+    }
 
     /**
      * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}.
@@ -121,10 +125,15 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
      *
      * @param name the name of the thread used to invoke the task repeatedly
      * @param task the task to execute repeatedly
-     * @param interruptHandler perform specific processing of interrupts of the task execution thread
+     * @param simulatorSafe flag indicating if the loop thread can be intercepted / rescheduled during cluster simulation
+     * @param daemon flag to indicate whether the loop thread should be a daemon thread or not
+     * @param interrupts flag to indicate whether to synchronize interrupts of the task execution thread
+     *                   using the task's monitor this can be used to prevent interruption while performing
+     *                   IO operations which forbid interrupted threads.
+     *                   See: {@link org.apache.cassandra.db.commitlog.AbstractCommitLogSegmentManager::start}
      * @return the new thread
      */
-    Interruptible infiniteLoop(String name, Interruptible.Task task, boolean simulatorSafe, Consumer<Thread> interruptHandler);
+    Interruptible infiniteLoop(String name, Interruptible.Task task, SimulatorSafe simulatorSafe, Daemon daemon, Interrupts interrupts);
 
     /**
      * Create and start a new InfiniteLoopExecutor to repeatedly invoke {@code runnable}.
@@ -133,11 +142,12 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
      *
      * @param name the name of the thread used to invoke the task repeatedly
      * @param task the task to execute repeatedly
+     * @param simulatorSafe flag indicating if the loop thread can be intercepted / rescheduled during cluster simulation
      * @return the new thread
      */
-    default Interruptible infiniteLoop(String name, Interruptible.SimpleTask task, boolean simulatorSafe)
+    default Interruptible infiniteLoop(String name, Interruptible.SimpleTask task, SimulatorSafe simulatorSafe)
     {
-        return infiniteLoop(name, Interruptible.Task.from(task), simulatorSafe);
+        return infiniteLoop(name, Interruptible.Task.from(task), simulatorSafe, DAEMON, UNSYNCHRONIZED);
     }
 
     /**
@@ -169,6 +179,7 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
             super(contextClassLoader, threadGroup, uncaughtExceptionHandler);
         }
 
+        @Override
         public LocalAwareSubFactory localAware()
         {
             return new LocalAwareSubFactory()
@@ -225,16 +236,19 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
             };
         }
 
+        @Override
         public ExecutorBuilder<SingleThreadExecutorPlus> configureSequential(String name)
         {
             return ThreadPoolExecutorBuilder.sequential(SingleThreadExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name);
         }
 
+        @Override
         public ExecutorBuilder<ThreadPoolExecutorPlus> configurePooled(String name, int threads)
         {
             return ThreadPoolExecutorBuilder.pooled(ThreadPoolExecutorPlus::new, contextClassLoader, threadGroup, uncaughtExceptionHandler, name, threads);
         }
 
+        @Override
         public ScheduledExecutorPlus scheduled(boolean executeOnShutdown, String name, int priority)
         {
             ScheduledThreadPoolExecutorPlus executor = new ScheduledThreadPoolExecutorPlus(newThreadFactory(name, priority));
@@ -243,22 +257,21 @@ public interface ExecutorFactory extends ExecutorBuilderFactory.Jmxable<Executor
             return executor;
         }
 
-        public Thread startThread(String name, Runnable runnable)
+        @Override
+        public Thread startThread(String name, Runnable runnable, Daemon daemon)
         {
-            Thread thread = setupThread(createThread(threadGroup, runnable, name, true), Thread.NORM_PRIORITY, contextClassLoader, uncaughtExceptionHandler);
+            Thread thread = setupThread(createThread(threadGroup, runnable, name, daemon == DAEMON),
+                                        Thread.NORM_PRIORITY,
+                                        contextClassLoader,
+                                        uncaughtExceptionHandler);
             thread.start();
             return thread;
         }
 
-        public Interruptible infiniteLoop(String name, Interruptible.Task task, boolean simulatorSafe)
-        {
-            return new InfiniteLoopExecutor(this, name, task);
-        }
-
         @Override
-        public Interruptible infiniteLoop(String name, Interruptible.Task task, boolean simulatorSafe, Consumer<Thread> interruptHandler)
+        public Interruptible infiniteLoop(String name, Interruptible.Task task, SimulatorSafe simulatorSafe, Daemon daemon, Interrupts interrupts)
         {
-            return new InfiniteLoopExecutor(this, name, task, interruptHandler);
+            return new InfiniteLoopExecutor(this, name, task, daemon, interrupts);
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
index 4012970..6ef439a 100644
--- a/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/InfiniteLoopExecutor.java
@@ -31,6 +31,8 @@ import java.util.function.Consumer;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.InternalState.TERMINATED;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SYNCHRONIZED;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.UNSYNCHRONIZED;
 import static org.apache.cassandra.concurrent.Interruptible.State.INTERRUPTED;
 import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
 import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
@@ -40,6 +42,9 @@ public class InfiniteLoopExecutor implements Interruptible
     private static final Logger logger = LoggerFactory.getLogger(InfiniteLoopExecutor.class);
 
     public enum InternalState { TERMINATED }
+    public enum SimulatorSafe { SAFE, UNSAFE }
+    public enum Daemon        { DAEMON, NON_DAEMON }
+    public enum Interrupts    { SYNCHRONIZED, UNSYNCHRONIZED }
 
     private static final AtomicReferenceFieldUpdater<InfiniteLoopExecutor, Object> stateUpdater = AtomicReferenceFieldUpdater.newUpdater(InfiniteLoopExecutor.class, Object.class, "state");
     private final Thread thread;
@@ -47,30 +52,36 @@ public class InfiniteLoopExecutor implements Interruptible
     private volatile Object state = NORMAL;
     private final Consumer<Thread> interruptHandler;
 
-    public InfiniteLoopExecutor(String name, Task task)
+    public InfiniteLoopExecutor(String name, Task task, Daemon daemon)
     {
-        this(ExecutorFactory.Global.executorFactory(), name, task, Thread::interrupt);
+        this(ExecutorFactory.Global.executorFactory(), name, task, daemon, UNSYNCHRONIZED);
     }
 
-    public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task)
+    public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Daemon daemon)
     {
-        this(factory, name, task, Thread::interrupt);
+        this(factory, name, task, daemon, UNSYNCHRONIZED);
     }
 
-    public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Consumer<Thread> interruptHandler)
+    public InfiniteLoopExecutor(ExecutorFactory factory, String name, Task task, Daemon daemon, Interrupts interrupts)
     {
         this.task = task;
-        this.thread = factory.startThread(name, this::loop);
-        this.interruptHandler = interruptHandler;
+        this.thread = factory.startThread(name, this::loop, daemon);
+        this.interruptHandler = interrupts == SYNCHRONIZED
+                                ? interruptHandler(task)
+                                : Thread::interrupt;
     }
 
-    public InfiniteLoopExecutor(BiFunction<String, Runnable, Thread> threadStarter, String name, Task task, Consumer<Thread> interruptHandler)
+    private static Consumer<Thread> interruptHandler(final Object monitor)
     {
-        this.task = task;
-        this.thread = threadStarter.apply(name, this::loop);
-        this.interruptHandler = interruptHandler;
+        return thread -> {
+            synchronized (monitor)
+            {
+                thread.interrupt();
+            }
+        };
     }
 
+
     private void loop()
     {
         boolean interrupted = false;
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index 0c438d5..485e3fd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BooleanSupplier;
-import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -47,6 +46,9 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.concurrent.*;
 
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.NON_DAEMON;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SYNCHRONIZED;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;
 import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
 
@@ -106,11 +108,29 @@ public abstract class AbstractCommitLogSegmentManager
 
     void start()
     {
-        // used for synchronization to prevent thread interrupts while performing IO operations
-        final Object monitor = new Object();
-        // The run loop for the manager thread
-        Interruptible.Task runnable = state -> {
+        // For encrypted segments we want to keep the compression buffers on-heap as we need those bytes for encryption,
+        // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
+        BufferType bufferType = commitLog.configuration.useEncryption() || !commitLog.configuration.useCompression()
+                                ? BufferType.ON_HEAP
+                                : commitLog.configuration.getCompressor().preferredBufferType();
+
+        this.bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(),
+                                                     DatabaseDescriptor.getCommitLogSegmentSize(),
+                                                     bufferType);
+
 
+        AllocatorRunnable allocator = new AllocatorRunnable();
+        executor = executorFactory().infiniteLoop("COMMIT-LOG-ALLOCATOR", allocator, SAFE, NON_DAEMON, SYNCHRONIZED);
+        // for simplicity, ensure the first segment is allocated before continuing
+        advanceAllocatingFrom(null);
+    }
+
+    class AllocatorRunnable implements Interruptible.Task
+    {
+        // The run loop for the manager thread
+        @Override
+        public void run(Interruptible.State state) throws InterruptedException
+        {
             try
             {
                 switch (state)
@@ -123,9 +143,12 @@ public abstract class AbstractCommitLogSegmentManager
 
                     case NORMAL:
                         assert availableSegment == null;
+                        // synchronized to prevent thread interrupts while performing IO operations and also
+                        // clear interrupted status to prevent ClosedByInterruptException in createSegment
 
-                        synchronized (monitor)
+                        synchronized (this)
                         {
+                            Thread.interrupted();
                             logger.trace("No segments in reserve; creating a fresh one");
                             availableSegment = createSegment();
 
@@ -155,33 +178,7 @@ public abstract class AbstractCommitLogSegmentManager
                 // shutting down-- nothing more can or needs to be done in that case.
             }
             WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue);
-        };
-
-        // For encrypted segments we want to keep the compression buffers on-heap as we need those bytes for encryption,
-        // and we want to avoid copying from off-heap (compression buffer) to on-heap encryption APIs
-        BufferType bufferType = commitLog.configuration.useEncryption() || !commitLog.configuration.useCompression()
-                              ? BufferType.ON_HEAP
-                              : commitLog.configuration.getCompressor().preferredBufferType();
-
-        this.bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(),
-                                                     DatabaseDescriptor.getCommitLogSegmentSize(),
-                                                     bufferType);
-
-        Consumer<Thread> interruptHandler = interruptHandler(monitor);
-        executor = executorFactory().infiniteLoop("COMMIT-LOG-ALLOCATOR", runnable, true, interruptHandler);
-
-        // for simplicity, ensure the first segment is allocated before continuing
-        advanceAllocatingFrom(null);
-    }
-
-    private Consumer<Thread> interruptHandler(final Object monitor)
-    {
-        return thread -> {
-            synchronized (monitor)
-            {
-                thread.interrupt();
-            }
-        };
+        }
     }
 
     private boolean atSegmentBufferLimit()
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 9d030d7..be3f8cd 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -36,7 +36,11 @@ import static com.codahale.metrics.Timer.Context;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.NON_DAEMON;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Interrupts.SYNCHRONIZED;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
 import static org.apache.cassandra.concurrent.Interruptible.State.NORMAL;
 import static org.apache.cassandra.concurrent.Interruptible.State.SHUTTING_DOWN;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
@@ -147,7 +151,8 @@ public abstract class AbstractCommitLogService
             throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %fms",
                                                              syncIntervalNanos * 1e-6));
 
-        executor = executorFactory().infiniteLoop(name, new SyncRunnable(MonotonicClock.preciseTime), true);
+        SyncRunnable sync = new SyncRunnable(MonotonicClock.preciseTime);
+        executor = executorFactory().infiniteLoop(name, sync, SAFE, NON_DAEMON, SYNCHRONIZED);
     }
 
     class SyncRunnable implements Interruptible.Task
@@ -171,19 +176,25 @@ public abstract class AbstractCommitLogService
                 // sync and signal
                 long pollStarted = clock.now();
                 boolean flushToDisk = lastSyncedAt + syncIntervalNanos <= pollStarted || state != NORMAL || syncRequested;
-                if (flushToDisk)
+                // synchronized to prevent thread interrupts while performing IO operations and also
+                // clear interrupted status to prevent ClosedByInterruptException in CommitLog::sync
+                synchronized (this)
                 {
-                    // in this branch, we want to flush the commit log to disk
-                    syncRequested = false;
-                    commitLog.sync(true);
-                    lastSyncedAt = pollStarted;
-                    syncComplete.signalAll();
-                    syncCount++;
-                }
-                else
-                {
-                    // in this branch, just update the commit log sync headers
-                    commitLog.sync(false);
+                    Thread.interrupted();
+                    if (flushToDisk)
+                    {
+                        // in this branch, we want to flush the commit log to disk
+                        syncRequested = false;
+                        commitLog.sync(true);
+                        lastSyncedAt = pollStarted;
+                        syncComplete.signalAll();
+                        syncCount++;
+                    }
+                    else
+                    {
+                        // in this branch, just update the commit log sync headers
+                        commitLog.sync(false);
+                    }
                 }
 
                 if (state == SHUTTING_DOWN)
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 26644dc..c077467 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -31,7 +31,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.cassandra.concurrent.InfiniteLoopExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +52,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import static java.util.Collections.emptyList;
 
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -357,7 +357,7 @@ public final class Ref<T> implements RefCounted<T>
     static final Set<Class<?>> concurrentIterables = Collections.newSetFromMap(new IdentityHashMap<>());
     private static final Set<GlobalState> globallyExtant = Collections.newSetFromMap(new ConcurrentHashMap<>());
     static final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
-    private static final Shutdownable EXEC = executorFactory().infiniteLoop("Reference-Reaper", Ref::reapOneReference, false);
+    private static final Shutdownable EXEC = executorFactory().infiniteLoop("Reference-Reaper", Ref::reapOneReference, UNSAFE);
     static final ScheduledExecutorService STRONG_LEAK_DETECTOR = !DEBUG_ENABLED ? null : executorFactory().scheduled("Strong-Reference-Leak-Detector");
     static
     {
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
index 7c1e95e..d656616 100644
--- a/src/java/org/apache/cassandra/utils/memory/BufferPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.utils.concurrent.Ref;
 
 import static com.google.common.collect.ImmutableList.of;
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
 import static org.apache.cassandra.utils.ExecutorUtils.*;
 import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
 import static org.apache.cassandra.utils.memory.MemoryUtil.isExactlyDirect;
@@ -173,7 +174,7 @@ public class BufferPool
         this.globalPool = new GlobalPool();
         this.metrics = new BufferPoolMetrics(name, this);
         this.recyclePartially = recyclePartially;
-        this.localPoolCleaner = executorFactory().infiniteLoop("LocalPool-Cleaner-" + name, this::cleanupOneReference, false);
+        this.localPoolCleaner = executorFactory().infiniteLoop("LocalPool-Cleaner-" + name, this::cleanupOneReference, UNSAFE);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
index e11ce99..dbc23e5 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.cassandra.concurrent.Interruptible;
 import org.apache.cassandra.utils.concurrent.WaitQueue;
 
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.SAFE;
 import static org.apache.cassandra.utils.concurrent.WaitQueue.newWaitQueue;
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 
@@ -116,7 +117,7 @@ public class MemtableCleanerThread<P extends MemtablePool> implements Interrupti
 
     private MemtableCleanerThread(Clean<P> clean)
     {
-        this.executor = executorFactory().infiniteLoop(clean.pool.getClass().getSimpleName() + "Cleaner", clean, true);
+        this.executor = executorFactory().infiniteLoop(clean.pool.getClass().getSimpleName() + "Cleaner", clean, SAFE);
         this.trigger = clean.wait::signal;
         this.clean = clean;
     }
diff --git a/test/unit/org/apache/cassandra/concurrent/InfiniteLoopExecutorTest.java b/test/unit/org/apache/cassandra/concurrent/InfiniteLoopExecutorTest.java
index 73dca7e..9ec702d 100644
--- a/test/unit/org/apache/cassandra/concurrent/InfiniteLoopExecutorTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/InfiniteLoopExecutorTest.java
@@ -30,13 +30,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.apache.cassandra.concurrent.InfiniteLoopExecutor.Daemon.DAEMON;
+
 public class InfiniteLoopExecutorTest
 {
     @Test
     public void testShutdownNow() throws InterruptedException, ExecutionException, TimeoutException
     {
         Semaphore semaphore = new Semaphore(0);
-        InfiniteLoopExecutor e1 = new InfiniteLoopExecutor("test", ignore -> semaphore.acquire(1));
+        InfiniteLoopExecutor e1 = new InfiniteLoopExecutor("test", ignore -> semaphore.acquire(1), DAEMON);
         ExecutorService exec = Executors.newCachedThreadPool();
         Future<?> f = exec.submit(() -> e1.awaitTermination(1L, TimeUnit.MINUTES));
         e1.shutdownNow();
@@ -53,7 +55,7 @@ public class InfiniteLoopExecutorTest
             semaphore.acquire(1);
             active.set(false);
             semaphore.release();
-        });
+        }, DAEMON);
         ExecutorService exec = Executors.newCachedThreadPool();
         Future<?> f = exec.submit(() -> e1.awaitTermination(1L, TimeUnit.MINUTES));
         // do ten normal loops

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org