You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2020/11/13 23:06:06 UTC

[geode] branch GEODE-8521-CoreLoggingExecutors-tests created (now f49c8df)

This is an automated email from the ASF dual-hosted git repository.

klund pushed a change to branch GEODE-8521-CoreLoggingExecutors-tests
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at f49c8df  Fixup spotless format

This branch includes the following new commits:

     new a1a7f00  Add test for newFunctionThreadPoolWithFeedStatistics
     new dddd181  More cleanup
     new f49c8df  Fixup spotless format

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 02/03: More cleanup

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-8521-CoreLoggingExecutors-tests
in repository https://gitbox.apache.org/repos/asf/geode.git

commit dddd181ae153a0dc7470ec60383e5374ae67cb70
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Nov 13 15:03:22 2020 -0800

    More cleanup
---
 .../internal/FunctionExecutionPooledExecutor.java  | 140 ++++---
 .../internal/SerialQueuedExecutorWithDMStats.java  |  18 +-
 .../ScheduledThreadPoolExecutorWithKeepAlive.java  |   9 +-
 .../internal/logging/CoreLoggingExecutors.java     |  77 ++--
 .../internal/logging/CoreLoggingExecutorsTest.java | 460 ++++++++++++++++++++-
 .../internal/executors/LoggingExecutors.java       |   4 +-
 6 files changed, 604 insertions(+), 104 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
index 7d485b8..8008f91 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
@@ -157,71 +157,9 @@ public class FunctionExecutionPooledExecutor extends ThreadPoolExecutor {
 
   private static RejectedExecutionHandler newRejectedExecutionHandler(
       BlockingQueue<Runnable> blockingQueue, boolean forFunctionExecution) {
-    if (forFunctionExecution) {
-      return new RejectedExecutionHandler() {
-        @Override
-        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-          if (executor.isShutdown()) {
-            throw new RejectedExecutionException("executor has been shutdown");
-          }
-          if (isBufferConsumer((FunctionExecutionPooledExecutor) executor)) {
-            handleRejectedExecutionForBufferConsumer(r, executor);
-          } else if (isFunctionExecutionThread()) {
-            handleRejectedExecutionForFunctionExecutionThread(r, executor);
-          } else {
-            // In the normal case, add the rejected request to the blocking queue.
-            try {
-              blockingQueue.put(r);
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              // this thread is being shutdown so just return;
-            }
-          }
-        }
 
-        private boolean isBufferConsumer(FunctionExecutionPooledExecutor executor) {
-          return Thread.currentThread() == executor.bufferConsumer;
-        }
-
-        private boolean isFunctionExecutionThread() {
-          return isFunctionExecutionThread.get();
-        }
-
-        /**
-         * Handle rejected execution for the bufferConsumer (the thread that takes from the blocking
-         * queue and offers to the synchronous queue). Spin off a thread directly in this case,
-         * since an offer has already been made to the synchronous queue and failed. This means that
-         * all the function execution threads are in use.
-         */
-        private void handleRejectedExecutionForBufferConsumer(Runnable r,
-            ThreadPoolExecutor executor) {
-          logger.warn("An additional " + FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX
-              + " thread is being launched because all " + executor.getMaximumPoolSize()
-              + " thread pool threads are in use for greater than " + OFFER_TIME_MILLIS + " ms");
-          launchAdditionalThread(r, executor);
-        }
-
-        /**
-         * Handle rejected execution for a function execution thread. Spin off a thread directly in
-         * this case, since that means a function is executing another function. The child function
-         * request shouldn't be in the queue behind the parent request since the parent function is
-         * dependent on the child function executing.
-         */
-        private void handleRejectedExecutionForFunctionExecutionThread(Runnable r,
-            ThreadPoolExecutor executor) {
-          if (logger.isDebugEnabled()) {
-            logger.warn(
-                "An additional {} thread is being launched to prevent slow performance due to nested function executions",
-                FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX);
-          }
-          launchAdditionalThread(r, executor);
-        }
-
-        private void launchAdditionalThread(Runnable r, ThreadPoolExecutor executor) {
-          Thread th = executor.getThreadFactory().newThread(r);
-          th.start();
-        }
-      };
+    if (forFunctionExecution) {
+      return new FunctionExecutionRejectedExecutionHandler(blockingQueue);
     }
 
     if (blockingQueue instanceof SynchronousQueue) {
@@ -297,6 +235,80 @@ public class FunctionExecutionPooledExecutor extends ThreadPoolExecutor {
     return 1;
   }
 
+  @VisibleForTesting
+  public static class FunctionExecutionRejectedExecutionHandler
+      implements RejectedExecutionHandler {
+
+    private final BlockingQueue<Runnable> blockingQueue;
+
+    private FunctionExecutionRejectedExecutionHandler(BlockingQueue<Runnable> blockingQueue) {
+      this.blockingQueue = blockingQueue;
+    }
+
+    @Override
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+      if (executor.isShutdown()) {
+        throw new RejectedExecutionException("executor has been shutdown");
+      }
+      if (isBufferConsumer((FunctionExecutionPooledExecutor) executor)) {
+        handleRejectedExecutionForBufferConsumer(r, executor);
+      } else if (isFunctionExecutionThread()) {
+        handleRejectedExecutionForFunctionExecutionThread(r, executor);
+      } else {
+        // In the normal case, add the rejected request to the blocking queue.
+        try {
+          blockingQueue.put(r);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          // this thread is being shutdown so just return;
+        }
+      }
+    }
+
+    private boolean isBufferConsumer(FunctionExecutionPooledExecutor executor) {
+      return Thread.currentThread() == executor.bufferConsumer;
+    }
+
+    private boolean isFunctionExecutionThread() {
+      return isFunctionExecutionThread.get();
+    }
+
+    /**
+     * Handle rejected execution for the bufferConsumer (the thread that takes from the blocking
+     * queue and offers to the synchronous queue). Spin off a thread directly in this case,
+     * since an offer has already been made to the synchronous queue and failed. This means that
+     * all the function execution threads are in use.
+     */
+    private void handleRejectedExecutionForBufferConsumer(Runnable r,
+        ThreadPoolExecutor executor) {
+      logger.warn("An additional " + FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX
+          + " thread is being launched because all " + executor.getMaximumPoolSize()
+          + " thread pool threads are in use for greater than " + OFFER_TIME_MILLIS + " ms");
+      launchAdditionalThread(r, executor);
+    }
+
+    /**
+     * Handle rejected execution for a function execution thread. Spin off a thread directly in
+     * this case, since that means a function is executing another function. The child function
+     * request shouldn't be in the queue behind the parent request since the parent function is
+     * dependent on the child function executing.
+     */
+    private void handleRejectedExecutionForFunctionExecutionThread(Runnable r,
+        ThreadPoolExecutor executor) {
+      if (logger.isDebugEnabled()) {
+        logger.warn(
+            "An additional {} thread is being launched to prevent slow performance due to nested function executions",
+            FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX);
+      }
+      launchAdditionalThread(r, executor);
+    }
+
+    private void launchAdditionalThread(Runnable r, ThreadPoolExecutor executor) {
+      Thread th = executor.getThreadFactory().newThread(r);
+      th.start();
+    }
+  }
+
   /**
    * This handler fronts a synchronous queue, that is owned by the parent ThreadPoolExecutor, with a
    * client supplied BlockingQueue that supports storage (the buffer queue). A dedicated thread is
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/SerialQueuedExecutorWithDMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/SerialQueuedExecutorWithDMStats.java
index 80aca66..01293e9 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/SerialQueuedExecutorWithDMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/SerialQueuedExecutorWithDMStats.java
@@ -20,6 +20,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 
 /**
@@ -29,7 +30,7 @@ import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 public class SerialQueuedExecutorWithDMStats extends ThreadPoolExecutor {
 
   private final PoolStatHelper poolStatHelper;
-  private final ThreadsMonitoring threadMonitoring;
+  private final ThreadsMonitoring threadsMonitoring;
 
   public SerialQueuedExecutorWithDMStats(BlockingQueue<Runnable> workQueue,
       ThreadFactory threadFactory, PoolStatHelper poolStatHelper,
@@ -38,7 +39,7 @@ public class SerialQueuedExecutorWithDMStats extends ThreadPoolExecutor {
         new PooledExecutorWithDMStats.BlockHandler());
 
     this.poolStatHelper = poolStatHelper;
-    threadMonitoring = threadsMonitoring;
+    this.threadsMonitoring = threadsMonitoring;
   }
 
   @Override
@@ -46,8 +47,8 @@ public class SerialQueuedExecutorWithDMStats extends ThreadPoolExecutor {
     if (poolStatHelper != null) {
       poolStatHelper.startJob();
     }
-    if (threadMonitoring != null) {
-      threadMonitoring.startMonitor(ThreadsMonitoring.Mode.SerialQueuedExecutor);
+    if (threadsMonitoring != null) {
+      threadsMonitoring.startMonitor(ThreadsMonitoring.Mode.SerialQueuedExecutor);
     }
   }
 
@@ -56,8 +57,13 @@ public class SerialQueuedExecutorWithDMStats extends ThreadPoolExecutor {
     if (poolStatHelper != null) {
       poolStatHelper.endJob();
     }
-    if (threadMonitoring != null) {
-      threadMonitoring.endMonitor();
+    if (threadsMonitoring != null) {
+      threadsMonitoring.endMonitor();
     }
   }
+
+  @VisibleForTesting
+  public PoolStatHelper getPoolStatHelper() {
+    return poolStatHelper;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/ScheduledThreadPoolExecutorWithKeepAlive.java b/geode-core/src/main/java/org/apache/geode/internal/ScheduledThreadPoolExecutorWithKeepAlive.java
index c01dc78..891e607 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/ScheduledThreadPoolExecutorWithKeepAlive.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/ScheduledThreadPoolExecutorWithKeepAlive.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 
 /**
@@ -220,6 +221,11 @@ public class ScheduledThreadPoolExecutorWithKeepAlive
     timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(b);
   }
 
+  @VisibleForTesting
+  public ScheduledThreadPoolExecutor getDelegateExecutor() {
+    return timer;
+  }
+
   /**
    * A Runnable which we put in the timer which simply hands off the contain task for execution in
    * the thread pool when the timer fires.
@@ -317,7 +323,8 @@ public class ScheduledThreadPoolExecutorWithKeepAlive
    * A RejectedExecutionHandler which causes the caller to block until there is space in the queue
    * for the task.
    */
-  protected static class BlockCallerPolicy implements RejectedExecutionHandler {
+  @VisibleForTesting
+  public static class BlockCallerPolicy implements RejectedExecutionHandler {
 
     @Override
     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/logging/CoreLoggingExecutors.java b/geode-core/src/main/java/org/apache/geode/internal/logging/CoreLoggingExecutors.java
index c70eb24..38fcb4f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/logging/CoreLoggingExecutors.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/logging/CoreLoggingExecutors.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor;
 import org.apache.geode.distributed.internal.OverflowQueueWithDMStats;
 import org.apache.geode.distributed.internal.PoolStatHelper;
@@ -44,13 +45,13 @@ import org.apache.geode.logging.internal.executors.LoggingThreadFactory.CommandW
 import org.apache.geode.logging.internal.executors.LoggingThreadFactory.ThreadInitializer;
 
 /**
- * Utility class that creates instances of ExecutorService
- * whose threads will always log uncaught exceptions.
+ * Utility class that creates instances of ExecutorService whose threads will always log uncaught
+ * exceptions.
  */
 public class CoreLoggingExecutors {
 
   private static final String IDLE_THREAD_TIMEOUT_MILLIS_PROPERTY = "IDLE_THREAD_TIMEOUT";
-  private static final int DEFAULT_IDLE_THREAD_TIMEOUT_MILLIS = 30000 * 60;
+  private static final int DEFAULT_IDLE_THREAD_TIMEOUT_MILLIS = 30_000 * 60;
 
   public static ExecutorService newFixedThreadPoolWithTimeout(int poolSize, long keepAliveTime,
       TimeUnit unit, QueueStatHelper queueStatHelper, String threadName) {
@@ -62,33 +63,32 @@ public class CoreLoggingExecutors {
   public static ExecutorService newFunctionThreadPoolWithFeedStatistics(int poolSize,
       int workQueueSize, QueueStatHelper queueStatHelper, String threadName,
       ThreadInitializer threadInitializer, CommandWrapper commandWrapper,
-      PoolStatHelper poolStatsHelper, ThreadsMonitoring threadsMonitoring) {
+      PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring) {
     BlockingQueue<Runnable> workQueue =
         createWorkQueueWithStatistics(workQueueSize, queueStatHelper);
     ThreadFactory threadFactory =
         new LoggingThreadFactory(threadName, threadInitializer, commandWrapper);
-    return new FunctionExecutionPooledExecutor(poolSize, workQueue, threadFactory, poolStatsHelper,
+    return new FunctionExecutionPooledExecutor(poolSize, workQueue, threadFactory, poolStatHelper,
         threadsMonitoring);
   }
 
   public static ExecutorService newSerialThreadPool(BlockingQueue<Runnable> workQueue,
       String threadName, ThreadInitializer threadInitializer, CommandWrapper commandWrapper,
-      PoolStatHelper poolStatsHelper, ThreadsMonitoring threadsMonitoring) {
+      PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring) {
     ThreadFactory threadFactory =
         new LoggingThreadFactory(threadName, threadInitializer, commandWrapper);
-    return new SerialQueuedExecutorWithDMStats(workQueue, threadFactory, poolStatsHelper,
+    return new SerialQueuedExecutorWithDMStats(workQueue, threadFactory, poolStatHelper,
         threadsMonitoring);
   }
 
   public static ExecutorService newSerialThreadPoolWithFeedStatistics(int workQueueSize,
       QueueStatHelper queueStatHelper, String threadName, ThreadInitializer threadInitializer,
-      CommandWrapper commandWrapper, PoolStatHelper poolStatsHelper,
+      CommandWrapper commandWrapper, PoolStatHelper poolStatHelper,
       ThreadsMonitoring threadsMonitoring) {
     BlockingQueue<Runnable> workQueue =
         createWorkQueueWithStatistics(workQueueSize, queueStatHelper);
     return newSerialThreadPool(workQueue, threadName, threadInitializer, commandWrapper,
-        poolStatsHelper,
-        threadsMonitoring);
+        poolStatHelper, threadsMonitoring);
   }
 
   public static ScheduledExecutorService newScheduledThreadPool(int poolSize, long keepAliveTime,
@@ -104,30 +104,30 @@ public class CoreLoggingExecutors {
 
   public static ExecutorService newThreadPool(int poolSize, BlockingQueue<Runnable> workQueue,
       String threadName, ThreadInitializer threadInitializer, CommandWrapper commandWrapper,
-      PoolStatHelper poolStatsHelper, ThreadsMonitoring threadsMonitoring) {
+      PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring) {
     ThreadFactory threadFactory =
         new LoggingThreadFactory(threadName, threadInitializer, commandWrapper);
     return new PooledExecutorWithDMStats(poolSize, getIdleThreadTimeoutMillis(), MILLISECONDS,
-        workQueue, threadFactory, poolStatsHelper, threadsMonitoring);
+        workQueue, threadFactory, poolStatHelper, threadsMonitoring);
   }
 
   public static ExecutorService newThreadPoolWithFixedFeed(int poolSize, long keepAliveTime,
       TimeUnit unit, int workQueueSize, String threadName, CommandWrapper commandWrapper,
-      PoolStatHelper poolStatsHelper, ThreadsMonitoring threadsMonitoring) {
+      PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring) {
     ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(workQueueSize);
     ThreadFactory threadFactory = new LoggingThreadFactory(threadName, commandWrapper);
     return new PooledExecutorWithDMStats(poolSize, keepAliveTime, unit, workQueue, threadFactory,
-        poolStatsHelper, threadsMonitoring);
+        poolStatHelper, threadsMonitoring);
   }
 
   public static ExecutorService newThreadPoolWithFeedStatistics(int poolSize, int workQueueSize,
       QueueStatHelper queueStatHelper, String threadName, ThreadInitializer threadInitializer,
-      CommandWrapper commandWrapper, PoolStatHelper poolStatsHelper,
+      CommandWrapper commandWrapper, PoolStatHelper poolStatHelper,
       ThreadsMonitoring threadsMonitoring) {
     BlockingQueue<Runnable> workQueue =
         createWorkQueueWithStatistics(workQueueSize, queueStatHelper);
     return newThreadPool(poolSize, workQueue, threadName, threadInitializer, commandWrapper,
-        poolStatsHelper, threadsMonitoring);
+        poolStatHelper, threadsMonitoring);
   }
 
   public static ExecutorService newThreadPoolWithSynchronousFeed(int poolSize, String threadName,
@@ -140,20 +140,20 @@ public class CoreLoggingExecutors {
 
   public static ExecutorService newThreadPoolWithSynchronousFeed(int poolSize, long keepAliveTime,
       TimeUnit unit, String threadName, CommandWrapper commandWrapper,
-      PoolStatHelper poolStatsHelper, ThreadsMonitoring threadsMonitoring) {
+      PoolStatHelper poolStatHelper, ThreadsMonitoring threadsMonitoring) {
     ThreadFactory threadFactory = new LoggingThreadFactory(threadName, commandWrapper);
     SynchronousQueue<Runnable> workQueue = new SynchronousQueue<>();
     return new PooledExecutorWithDMStats(poolSize, keepAliveTime, unit, workQueue, threadFactory,
-        poolStatsHelper, threadsMonitoring);
+        poolStatHelper, threadsMonitoring);
   }
 
   public static ExecutorService newThreadPoolWithSynchronousFeed(int poolSize, long keepAliveTime,
       TimeUnit unit, String threadName, RejectedExecutionHandler rejectionHandler,
-      PoolStatHelper poolStatsHelper) {
+      PoolStatHelper poolStatHelper) {
     SynchronousQueue<Runnable> workQueue = new SynchronousQueue<>();
     ThreadFactory threadFactory = new LoggingThreadFactory(threadName);
     return new PooledExecutorWithDMStats(poolSize, keepAliveTime, unit, workQueue, threadFactory,
-        rejectionHandler, poolStatsHelper, null);
+        rejectionHandler, poolStatHelper, null);
   }
 
   public static ExecutorService newThreadPoolWithSynchronousFeed(int corePoolSize,
@@ -166,7 +166,9 @@ public class CoreLoggingExecutors {
         threadFactory);
   }
 
-  /** Used for P2P Reader Threads in ConnectionTable */
+  /**
+   * Used for P2P Reader Threads in ConnectionTable
+   */
   public static ExecutorService newThreadPoolWithSynchronousFeed(int corePoolSize,
       int maximumPoolSize, long keepAliveTime, TimeUnit unit, String threadName) {
     return newThreadPoolWithSynchronousFeed(corePoolSize, maximumPoolSize, keepAliveTime, unit,
@@ -177,14 +179,8 @@ public class CoreLoggingExecutors {
       int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, String threadName,
       ThreadInitializer threadInitializer, CommandWrapper commandWrapper) {
     BlockingQueue<Runnable> blockingQueue = new SynchronousQueue<>();
-    RejectedExecutionHandler rejectedExecutionHandler = (r, executor) -> {
-      try {
-        blockingQueue.put(r);
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt(); // preserve the state
-        throw new RejectedExecutionException("interrupted", ex);
-      }
-    };
+    RejectedExecutionHandler rejectedExecutionHandler =
+        new QueuingRejectedExecutionHandler(blockingQueue);
     ThreadFactory threadFactory =
         new LoggingThreadFactory(threadName, threadInitializer, commandWrapper);
     return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
@@ -193,13 +189,13 @@ public class CoreLoggingExecutors {
 
   public static ExecutorService newThreadPoolWithUnlimitedFeed(int poolSize, long keepAliveTime,
       TimeUnit unit, String threadName, ThreadInitializer threadInitializer,
-      CommandWrapper commandWrapper, PoolStatHelper poolStatsHelper,
+      CommandWrapper commandWrapper, PoolStatHelper poolStatHelper,
       ThreadsMonitoring threadsMonitoring) {
     LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
     ThreadFactory threadFactory =
         new LoggingThreadFactory(threadName, threadInitializer, commandWrapper);
     return new PooledExecutorWithDMStats(poolSize, keepAliveTime, unit, workQueue, threadFactory,
-        poolStatsHelper, threadsMonitoring);
+        poolStatHelper, threadsMonitoring);
   }
 
   private CoreLoggingExecutors() {
@@ -222,4 +218,23 @@ public class CoreLoggingExecutors {
         DEFAULT_IDLE_THREAD_TIMEOUT_MILLIS);
   }
 
+  @VisibleForTesting
+  static class QueuingRejectedExecutionHandler implements RejectedExecutionHandler {
+
+    private final BlockingQueue<Runnable> blockingQueue;
+
+    private QueuingRejectedExecutionHandler(BlockingQueue<Runnable> blockingQueue) {
+      this.blockingQueue = blockingQueue;
+    }
+
+    @Override
+    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+      try {
+        blockingQueue.put(r);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt(); // preserve the state
+        throw new RejectedExecutionException("interrupted", ex);
+      }
+    }
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
index 8ccf4d0..ebb7c88 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
@@ -15,21 +15,30 @@
 package org.apache.geode.internal.logging;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor;
+import org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor.FunctionExecutionRejectedExecutionHandler;
 import org.apache.geode.distributed.internal.OverflowQueueWithDMStats;
 import org.apache.geode.distributed.internal.PoolStatHelper;
+import org.apache.geode.distributed.internal.PooledExecutorWithDMStats;
 import org.apache.geode.distributed.internal.QueueStatHelper;
+import org.apache.geode.distributed.internal.SerialQueuedExecutorWithDMStats;
+import org.apache.geode.internal.ScheduledThreadPoolExecutorWithKeepAlive;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.logging.internal.executors.LoggingThread;
 import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
@@ -55,6 +64,12 @@ public class CoreLoggingExecutorsTest {
     threadsMonitoring = mock(ThreadsMonitoring.class);
   }
 
+  /**
+   * Creates, passes in, and uses OverflowQueueWithDMStats.
+   *
+   * <p>
+   * Uses {@code ThreadPoolExecutor.AbortPolicy}.
+   */
   @Test
   public void newFixedThreadPoolWithTimeout() {
     int poolSize = 5;
@@ -73,6 +88,8 @@ public class CoreLoggingExecutorsTest {
     assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(keepAliveTime);
     assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
     assertThat(executor.getQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
     assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
 
     OverflowQueueWithDMStats overflowQueueWithDMStats =
@@ -87,6 +104,17 @@ public class CoreLoggingExecutorsTest {
     assertThat(thread.getName()).contains(threadName);
   }
 
+  /**
+   * Creates and passes in OverflowQueueWithDMStats. FunctionExecutionPooledExecutor then creates
+   * and passes up SynchronousQueue, but finally uses OverflowQueueWithDMStats.
+   *
+   * <p>
+   * {@code getBufferQueue()} returns passed-in OverflowQueueWithDMStats. {@code getQueue()} returns
+   * internal SynchronousQueue.
+   *
+   * <p>
+   * Uses {@code ThreadPoolExecutor.AbortPolicy}.
+   */
   @Test
   public void newFunctionThreadPoolWithFeedStatistics() {
     int poolSize = 5;
@@ -101,11 +129,13 @@ public class CoreLoggingExecutorsTest {
 
     FunctionExecutionPooledExecutor executor = (FunctionExecutionPooledExecutor) executorService;
 
+    assertThat(executor.getBufferQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
     assertThat(executor.getCorePoolSize()).isEqualTo(1);
     assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
     assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
-    assertThat(executor.getBufferQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
     assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(FunctionExecutionRejectedExecutionHandler.class);
     assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
 
     OverflowQueueWithDMStats overflowQueueWithDMStats =
@@ -119,4 +149,432 @@ public class CoreLoggingExecutorsTest {
     assertThat(thread).isInstanceOf(LoggingThread.class);
     assertThat(thread.getName()).contains(threadName);
   }
+
+  /**
+   * Passes in and uses BlockingQueue.
+   *
+   * <p>
+   * Uses {@code PooledExecutorWithDMStats.BlockHandler}.
+   */
+  @Test
+  public void newSerialThreadPool() {
+    BlockingQueue<Runnable> workQueue = mock(BlockingQueue.class);
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newSerialThreadPool(workQueue, threadName, threadInitializer, commandWrapper,
+            poolStatHelper, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(SerialQueuedExecutorWithDMStats.class);
+
+    SerialQueuedExecutorWithDMStats executor = (SerialQueuedExecutorWithDMStats) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(1);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
+    assertThat(executor.getPoolStatHelper()).isSameAs(poolStatHelper);
+    assertThat(executor.getQueue()).isSameAs(workQueue);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(PooledExecutorWithDMStats.BlockHandler.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  /**
+   * Creates, passes in, and uses OverflowQueueWithDMStats.
+   *
+   * <p>
+   * Uses {@code PooledExecutorWithDMStats.BlockHandler}.
+   */
+  @Test
+  public void newSerialThreadPoolWithFeedStatistics() {
+    int workQueueSize = 2;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newSerialThreadPoolWithFeedStatistics(workQueueSize, queueStatHelper, threadName,
+            threadInitializer, commandWrapper, poolStatHelper, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(SerialQueuedExecutorWithDMStats.class);
+
+    SerialQueuedExecutorWithDMStats executor = (SerialQueuedExecutorWithDMStats) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(1);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(1);
+    assertThat(executor.getPoolStatHelper()).isSameAs(poolStatHelper);
+    assertThat(executor.getQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(PooledExecutorWithDMStats.BlockHandler.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  /**
+   * Creates, passes up, and uses SynchronousQueue. ScheduledThreadPoolExecutorWithKeepAlive then
+   * creates a ThreadPoolExecutor to delegate to.
+   *
+   * <p>
+   * Uses {@code ScheduledThreadPoolExecutorWithKeepAlive.BlockCallerPolicy}.
+   */
+  @Test
+  public void newScheduledThreadPool() {
+    int poolSize = 10;
+    long keepAliveTime = 5000;
+    TimeUnit unit = SECONDS;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newScheduledThreadPool(poolSize, keepAliveTime, unit, threadName, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(ScheduledThreadPoolExecutorWithKeepAlive.class);
+
+    ScheduledThreadPoolExecutorWithKeepAlive executor =
+        (ScheduledThreadPoolExecutorWithKeepAlive) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getDelegateExecutor()).isInstanceOf(ScheduledThreadPoolExecutor.class);
+    assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(ScheduledThreadPoolExecutorWithKeepAlive.BlockCallerPolicy.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ScheduledThreadPoolExecutor delegate = executor.getDelegateExecutor();
+
+    assertThat(delegate.getContinueExistingPeriodicTasksAfterShutdownPolicy()).isFalse();
+    assertThat(delegate.getExecuteExistingDelayedTasksAfterShutdownPolicy()).isFalse();
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  /**
+   * Creates and passes up BlockingQueue. PooledExecutorWithDMStats then creates and passes up SynchronousQueue
+   */
+  @Test
+  public void newThreadPool() {
+    int poolSize = 10;
+    BlockingQueue<Runnable> workQueue = mock(BlockingQueue.class);
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newThreadPool(poolSize, workQueue, threadName, threadInitializer, commandWrapper,
+            poolStatHelper, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
+
+    PooledExecutorWithDMStats executor = (PooledExecutorWithDMStats) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(PooledExecutorWithDMStats.BufferHandler.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  /**
+   * Creates ArrayBlockingQueue but then uses SynchronousQueue
+   */
+  @Test
+  public void newThreadPoolWithFixedFeed() {
+    int poolSize = 10;
+    long keepAliveTime = 5000;
+    TimeUnit unit = SECONDS;
+    int workQueueSize = 2;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newThreadPoolWithFixedFeed(poolSize, keepAliveTime, unit, workQueueSize, threadName,
+            commandWrapper, poolStatHelper, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
+
+    PooledExecutorWithDMStats executor =
+        (PooledExecutorWithDMStats) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(PooledExecutorWithDMStats.BufferHandler.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  /**
+   * Creates and passes in OverflowQueueWithDMStats. PooledExecutorWithDMStats then creates and
+   * uses SynchronousQueue.
+   */
+  @Test
+  public void newThreadPoolWithFeedStatistics() {
+    int poolSize = 10;
+    int workQueueSize = 2;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newThreadPoolWithFeedStatistics(poolSize, workQueueSize, queueStatHelper, threadName,
+            threadInitializer, commandWrapper, poolStatHelper, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
+
+    PooledExecutorWithDMStats executor =
+        (PooledExecutorWithDMStats) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(PooledExecutorWithDMStats.BufferHandler.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  @Test
+  public void newThreadPoolWithSynchronousFeed() {
+    int poolSize = 10;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
+        poolSize, threadName, commandWrapper);
+
+    assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
+
+    PooledExecutorWithDMStats executor =
+        (PooledExecutorWithDMStats) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  @Test
+  public void newThreadPoolWithSynchronousFeed_2() {
+    int poolSize = 10;
+    long keepAliveTime = 5000;
+    TimeUnit unit = SECONDS;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
+        poolSize, keepAliveTime, unit, threadName, commandWrapper, poolStatHelper,
+        threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
+
+    PooledExecutorWithDMStats executor =
+        (PooledExecutorWithDMStats) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(ThreadPoolExecutor.CallerRunsPolicy.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  @Test
+  public void newThreadPoolWithSynchronousFeed_3() {
+    int poolSize = 10;
+    long keepAliveTime = 5000;
+    TimeUnit unit = SECONDS;
+    String threadName = "thread";
+    RejectedExecutionHandler rejectedExecutionHandler = mock(RejectedExecutionHandler.class);
+
+    ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
+        poolSize, keepAliveTime, unit, threadName, rejectedExecutionHandler, poolStatHelper);
+
+    assertThat(executorService).isInstanceOf(PooledExecutorWithDMStats.class);
+
+    PooledExecutorWithDMStats executor =
+        (PooledExecutorWithDMStats) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler()).isSameAs(rejectedExecutionHandler);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  @Test
+  public void newThreadPoolWithSynchronousFeed_4() {
+    int corePoolSize = 10;
+    int maximumPoolSize = 20;
+    long keepAliveTime = 5000;
+    TimeUnit unit = SECONDS;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
+        corePoolSize, maximumPoolSize, keepAliveTime, unit, threadName, threadInitializer,
+        commandWrapper);
+
+    assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
+
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(corePoolSize);
+    assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(maximumPoolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  /**
+   * Used for P2P Reader Threads in ConnectionTable
+   */
+  @Test
+  public void newThreadPoolWithSynchronousFeed_5() {
+    int corePoolSize = 10;
+    int maximumPoolSize = 20;
+    long keepAliveTime = 5000;
+    TimeUnit unit = SECONDS;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithSynchronousFeed(
+        corePoolSize, maximumPoolSize, keepAliveTime, unit, threadName);
+
+    assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
+
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(corePoolSize);
+    assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(maximumPoolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(ThreadPoolExecutor.AbortPolicy.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  @Test
+  public void newThreadPoolWithSynchronousFeedThatHandlesRejection() {
+    int corePoolSize = 10;
+    int maximumPoolSize = 20;
+    long keepAliveTime = 5000;
+    TimeUnit unit = SECONDS;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newThreadPoolWithSynchronousFeedThatHandlesRejection(corePoolSize, maximumPoolSize,
+            keepAliveTime, unit, threadName, threadInitializer, commandWrapper);
+
+    assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
+
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(corePoolSize);
+    assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(maximumPoolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(CoreLoggingExecutors.QueuingRejectedExecutionHandler.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  @Test
+  public void newThreadPoolWithUnlimitedFeed() {
+    int poolSize = 10;
+    long keepAliveTime = 5000;
+    TimeUnit unit = SECONDS;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors.newThreadPoolWithUnlimitedFeed(
+        poolSize, keepAliveTime, unit, threadName, threadInitializer, commandWrapper,
+        poolStatHelper, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
+
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(unit)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getRejectedExecutionHandler())
+        .isInstanceOf(PooledExecutorWithDMStats.BufferHandler.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
 }
diff --git a/geode-logging/src/main/java/org/apache/geode/logging/internal/executors/LoggingExecutors.java b/geode-logging/src/main/java/org/apache/geode/logging/internal/executors/LoggingExecutors.java
index ef3ca66..5151b48 100644
--- a/geode-logging/src/main/java/org/apache/geode/logging/internal/executors/LoggingExecutors.java
+++ b/geode-logging/src/main/java/org/apache/geode/logging/internal/executors/LoggingExecutors.java
@@ -97,7 +97,9 @@ public class LoggingExecutors {
     return newScheduledThreadPool(1, threadName);
   }
 
-  /** Used for P2P Reader Threads in ConnectionTable */
+  /**
+   * Used for P2P Reader Threads in ConnectionTable
+   */
   public static Executor newThreadOnEachExecute(String threadName) {
     return command -> new LoggingThread(threadName, command).start();
   }


[geode] 03/03: Fixup spotless format

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-8521-CoreLoggingExecutors-tests
in repository https://gitbox.apache.org/repos/asf/geode.git

commit f49c8dfef44abefb2b18a4655a31057259fe41e4
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Nov 13 15:04:44 2020 -0800

    Fixup spotless format
---
 .../org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
index ebb7c88..8dadf46 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
@@ -264,7 +264,8 @@ public class CoreLoggingExecutorsTest {
   }
 
   /**
-   * Creates and passes up BlockingQueue. PooledExecutorWithDMStats then creates and passes up SynchronousQueue
+   * Creates and passes up BlockingQueue. PooledExecutorWithDMStats then creates and passes up
+   * SynchronousQueue
    */
   @Test
   public void newThreadPool() {


[geode] 01/03: Add test for newFunctionThreadPoolWithFeedStatistics

Posted by kl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch GEODE-8521-CoreLoggingExecutors-tests
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a1a7f003b5fc117237059fd560099eff227b2751
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Thu Oct 29 17:29:57 2020 -0700

    Add test for newFunctionThreadPoolWithFeedStatistics
---
 .../internal/FunctionExecutionPooledExecutor.java  |   6 +
 .../internal/OverflowQueueWithDMStats.java         |   7 ++
 .../internal/logging/CoreLoggingExecutorsTest.java | 122 +++++++++++++++++++++
 3 files changed, 135 insertions(+)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
index 45b8333..7d485b8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/FunctionExecutionPooledExecutor.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.internal.monitoring.ThreadsMonitoring;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -284,6 +285,11 @@ public class FunctionExecutionPooledExecutor extends ThreadPoolExecutor {
     }
   }
 
+  @VisibleForTesting
+  public BlockingQueue<Runnable> getBufferQueue() {
+    return bufferQueue;
+  }
+
   private static int getCorePoolSize(int maxSize) {
     if (maxSize == Integer.MAX_VALUE) {
       return 0;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
index 4958928..7b5bdbd 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/OverflowQueueWithDMStats.java
@@ -19,6 +19,8 @@ import java.util.Collection;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.geode.annotations.VisibleForTesting;
+
 /**
  * A LinkedBlockingQueue that supports stats. Named OverflowQueue for historical reasons.
  *
@@ -183,4 +185,9 @@ public class OverflowQueueWithDMStats<E> extends LinkedBlockingQueue<E> {
   protected void postDrain(Collection<? super E> c) {
     // do nothing in this class. sub-classes can override
   }
+
+  @VisibleForTesting
+  public QueueStatHelper getQueueStatHelper() {
+    return stats;
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
new file mode 100644
index 0000000..8ccf4d0
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/logging/CoreLoggingExecutorsTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.logging;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.FunctionExecutionPooledExecutor;
+import org.apache.geode.distributed.internal.OverflowQueueWithDMStats;
+import org.apache.geode.distributed.internal.PoolStatHelper;
+import org.apache.geode.distributed.internal.QueueStatHelper;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.logging.internal.executors.LoggingThread;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory.CommandWrapper;
+import org.apache.geode.logging.internal.executors.LoggingThreadFactory.ThreadInitializer;
+
+public class CoreLoggingExecutorsTest {
+
+  private CommandWrapper commandWrapper;
+  private PoolStatHelper poolStatHelper;
+  private QueueStatHelper queueStatHelper;
+  private Runnable runnable;
+  private ThreadInitializer threadInitializer;
+  private ThreadsMonitoring threadsMonitoring;
+
+  @Before
+  public void setUp() {
+    commandWrapper = mock(CommandWrapper.class);
+    poolStatHelper = mock(PoolStatHelper.class);
+    queueStatHelper = mock(QueueStatHelper.class);
+    runnable = mock(Runnable.class);
+    threadInitializer = mock(ThreadInitializer.class);
+    threadsMonitoring = mock(ThreadsMonitoring.class);
+  }
+
+  @Test
+  public void newFixedThreadPoolWithTimeout() {
+    int poolSize = 5;
+    int keepAliveTime = 2;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newFixedThreadPoolWithTimeout(poolSize, keepAliveTime, MINUTES, queueStatHelper,
+            threadName);
+
+    assertThat(executorService).isInstanceOf(ThreadPoolExecutor.class);
+
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(keepAliveTime);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    OverflowQueueWithDMStats overflowQueueWithDMStats =
+        (OverflowQueueWithDMStats) executor.getQueue();
+
+    assertThat(overflowQueueWithDMStats.getQueueStatHelper()).isSameAs(queueStatHelper);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+
+  @Test
+  public void newFunctionThreadPoolWithFeedStatistics() {
+    int poolSize = 5;
+    int workQueueSize = 2;
+    String threadName = "thread";
+
+    ExecutorService executorService = CoreLoggingExecutors
+        .newFunctionThreadPoolWithFeedStatistics(poolSize, workQueueSize, queueStatHelper,
+            threadName, threadInitializer, commandWrapper, poolStatHelper, threadsMonitoring);
+
+    assertThat(executorService).isInstanceOf(FunctionExecutionPooledExecutor.class);
+
+    FunctionExecutionPooledExecutor executor = (FunctionExecutionPooledExecutor) executorService;
+
+    assertThat(executor.getCorePoolSize()).isEqualTo(1);
+    assertThat(executor.getKeepAliveTime(MINUTES)).isEqualTo(30);
+    assertThat(executor.getMaximumPoolSize()).isEqualTo(poolSize);
+    assertThat(executor.getBufferQueue()).isInstanceOf(OverflowQueueWithDMStats.class);
+    assertThat(executor.getQueue()).isInstanceOf(SynchronousQueue.class);
+    assertThat(executor.getThreadFactory()).isInstanceOf(LoggingThreadFactory.class);
+
+    OverflowQueueWithDMStats overflowQueueWithDMStats =
+        (OverflowQueueWithDMStats) executor.getBufferQueue();
+
+    assertThat(overflowQueueWithDMStats.getQueueStatHelper()).isSameAs(queueStatHelper);
+
+    ThreadFactory threadFactory = executor.getThreadFactory();
+    Thread thread = threadFactory.newThread(runnable);
+
+    assertThat(thread).isInstanceOf(LoggingThread.class);
+    assertThat(thread.getName()).contains(threadName);
+  }
+}