You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/11/25 00:34:02 UTC

[geode] 01/02: GEODE-9713: Support thread count in ExecutorService rules (#7002)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d6d9a9108150e2396ebe3ca91ee62c1e895cf040
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Mon Oct 18 17:16:32 2021 -0700

    GEODE-9713: Support thread count in ExecutorService rules (#7002)
    
    Restores thread count support to ExecutorServiceRule, and adds it to
    DistributedExecutorServiceRule.
    
    PROBLEM
    
    ExecutorService rules currently create a newCachedThreadPool which
    creates new threads as needed.
    
    Some usages would benefit from the option of specifying a threadCount
    limit which would create a newFixedThreadPool that reuses a fixed
    number of threads.
    
    SOLUTION
    
    Add optional threadCount creation parameter to both ExecutorServiceRule
    and DistributedExecutorServiceRule.
    
    Creating a ExecutorService rule without a threadCount will still create a
    newCachedThreadPool. Using a threadCount will now create a
    newFixedThreadPool.
    
    (cherry picked from commit 636bea3fd14c634d2568ed49eba3b13f1797d1ff)
---
 ...dExecutorServiceRuleLimitedThreadCountTest.java | 103 +++++++++++++
 ...butedExecutorServiceRuleLimitedVmCountTest.java |  37 +++++
 ...xecutorServiceRuleUnlimitedThreadCountTest.java |  75 ++++++++++
 .../rules/DistributedExecutorServiceRule.java      | 165 ++++++++++++++++++++-
 .../sanctioned-geode-dunit-serializables.txt       |   2 +-
 .../test/junit/rules/ExecutorServiceRule.java      |  69 +++++++--
 .../sanctioned-geode-junit-serializables.txt       |   2 +-
 7 files changed, 434 insertions(+), 19 deletions(-)

diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java
new file mode 100644
index 0000000..9328d5e
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedThreadCountTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule.builder;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+@SuppressWarnings("serial")
+public class DistributedExecutorServiceRuleLimitedThreadCountTest implements Serializable {
+
+  private static final int THREAD_COUNT = 2;
+  private static final long TIMEOUT = getTimeout().toMinutes();
+  private static final TimeUnit UNIT = TimeUnit.MINUTES;
+  private static final AtomicInteger STARTED_TASKS = new AtomicInteger();
+  private static final AtomicInteger COMPLETED_TASKS = new AtomicInteger();
+  private static final AtomicReference<CountDownLatch> LATCH = new AtomicReference<>();
+
+  @Rule
+  public DistributedExecutorServiceRule executorServiceRule = builder()
+      .threadCount(THREAD_COUNT).vmCount(1).build();
+
+  @Before
+  public void setUp() {
+    Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+      STARTED_TASKS.set(0);
+      COMPLETED_TASKS.set(0);
+      LATCH.set(new CountDownLatch(1));
+    }));
+  }
+
+  @Test
+  public void limitsRunningTasksToThreadCount() {
+    // start THREAD_COUNT threads to use up the executor's thread pool
+    Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+      for (int i = 1; i <= THREAD_COUNT; i++) {
+        executorServiceRule.submit(() -> {
+          // increment count of started tasks and use a LATCH to keep it running
+          STARTED_TASKS.incrementAndGet();
+          assertThat(LATCH.get().await(TIMEOUT, UNIT)).isTrue();
+          COMPLETED_TASKS.incrementAndGet();
+        });
+      }
+
+      // count of started tasks should be the same as THREAD_COUNT
+      await().untilAsserted(() -> {
+        assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT);
+        assertThat(COMPLETED_TASKS.get()).isZero();
+      });
+
+      // try to start one more task, but it should end up queued instead of started
+      executorServiceRule.submit(() -> {
+        STARTED_TASKS.incrementAndGet();
+        assertThat(LATCH.get().await(TIMEOUT, UNIT)).isTrue();
+        COMPLETED_TASKS.incrementAndGet();
+      });
+
+      // started tasks should still be the same as THREAD_COUNT
+      assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT);
+      assertThat(COMPLETED_TASKS.get()).isZero();
+
+      // number of threads running in executor should also be the same as THREAD_COUNT
+      assertThat(executorServiceRule.getThreads()).hasSize(THREAD_COUNT);
+
+      // open latch to let started tasks complete, and queued task should also start and finish
+      LATCH.get().countDown();
+
+      // all tasks should eventually complete as the executor threads finish tasks
+      await().untilAsserted(() -> {
+        assertThat(STARTED_TASKS.get()).isEqualTo(THREAD_COUNT + 1);
+        assertThat(COMPLETED_TASKS.get()).isEqualTo(THREAD_COUNT + 1);
+      });
+    }));
+  }
+}
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java
new file mode 100644
index 0000000..434b409
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleLimitedVmCountTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.dunit.VM.getVMCount;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+public class DistributedExecutorServiceRuleLimitedVmCountTest {
+
+  private static final int VM_COUNT = 2;
+
+  @Rule
+  public DistributedExecutorServiceRule executorServiceRule =
+      new DistributedExecutorServiceRule(0, VM_COUNT);
+
+  @Test
+  public void limitsVmCount() {
+    assertThat(getVMCount()).isEqualTo(VM_COUNT);
+  }
+}
diff --git a/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java
new file mode 100644
index 0000000..c76cafc
--- /dev/null
+++ b/geode-dunit/src/distributedTest/java/org/apache/geode/test/dunit/rules/tests/DistributedExecutorServiceRuleUnlimitedThreadCountTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.test.dunit.rules.tests;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
+import static org.apache.geode.test.dunit.VM.getController;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.test.dunit.rules.DistributedExecutorServiceRule;
+
+@SuppressWarnings("serial")
+public class DistributedExecutorServiceRuleUnlimitedThreadCountTest implements Serializable {
+
+  private static final int PARALLEL_TASK_COUNT = 4;
+  private static final long TIMEOUT = getTimeout().toMinutes();
+  private static final TimeUnit UNIT = TimeUnit.MINUTES;
+  private static final AtomicBoolean COMPLETED = new AtomicBoolean();
+  private static final AtomicReference<CyclicBarrier> BARRIER = new AtomicReference<>();
+
+  @Rule
+  public DistributedExecutorServiceRule executorServiceRule =
+      new DistributedExecutorServiceRule(0, 1);
+
+  @Before
+  public void setUp() {
+    Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+      COMPLETED.set(false);
+      BARRIER.set(new CyclicBarrier(PARALLEL_TASK_COUNT, () -> COMPLETED.set(true)));
+    }));
+  }
+
+  @Test
+  public void doesNotLimitThreadCount() {
+    Stream.of(getController(), getVM(0)).forEach(vm -> vm.invoke(() -> {
+      Collection<Future<Void>> tasks = new ArrayList<>();
+      for (int i = 1; i <= PARALLEL_TASK_COUNT; i++) {
+        tasks.add(executorServiceRule.submit(() -> {
+          BARRIER.get().await(TIMEOUT, UNIT);
+        }));
+      }
+      await().untilAsserted(() -> assertThat(COMPLETED.get()).isTrue());
+      for (Future<Void> task : tasks) {
+        assertThat(task).isDone();
+      }
+    }));
+  }
+}
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
index 44f7de5..1fff402 100644
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule.java
@@ -14,12 +14,15 @@
  */
 package org.apache.geode.test.dunit.rules;
 
+import static org.apache.geode.test.dunit.VM.DEFAULT_VM_COUNT;
+
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
@@ -27,17 +30,76 @@ import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule.ThrowingRunnable;
 
-@SuppressWarnings("unused")
+/**
+ * Every DUnit VM, including the JUnit Controller VM, has its own {@code ExecutorService}.
+ */
+@SuppressWarnings({"serial", "unused"})
 public class DistributedExecutorServiceRule extends AbstractDistributedRule {
 
   private static final AtomicReference<ExecutorServiceRule> delegate = new AtomicReference<>();
 
+  private final boolean enableAwaitTermination;
+  private final long awaitTerminationTimeout;
+  private final TimeUnit awaitTerminationTimeUnit;
+  private final boolean awaitTerminationBeforeShutdown;
+  private final boolean useShutdown;
+  private final boolean useShutdownNow;
+  private final int threadCount;
+
+  /**
+   * Returns a {@code Builder} to configure a new {@code ExecutorServiceRule}.
+   */
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /**
+   * Constructs a {@code DistributedExecutorServiceRule} which performs {@code shutdownNow} and
+   * thread dump of its threads if any were left running during {@code tearDown}.
+   */
   public DistributedExecutorServiceRule() {
-    // default vmCount
+    this(new Builder().threadCount(0).vmCount(DEFAULT_VM_COUNT));
   }
 
-  public DistributedExecutorServiceRule(int vmCount) {
+  /**
+   * Constructs a {@code DistributedExecutorServiceRule} which performs {@code shutdownNow} and
+   * thread dump of its threads if any were left running during {@code tearDown}.
+   *
+   * @param threadCount The number of threads in the pool. Creates fixed thread pool if > 0; else
+   *        creates cached thread pool.
+   * @param vmCount specified number of VMs
+   */
+  public DistributedExecutorServiceRule(int threadCount, int vmCount) {
+    this(new Builder().threadCount(threadCount).vmCount(vmCount));
+  }
+
+  private DistributedExecutorServiceRule(Builder builder) {
+    this(builder.enableAwaitTermination,
+        builder.awaitTerminationTimeout,
+        builder.awaitTerminationTimeUnit,
+        builder.awaitTerminationBeforeShutdown,
+        builder.useShutdown,
+        builder.useShutdownNow,
+        builder.threadCount,
+        builder.vmCount);
+  }
+
+  private DistributedExecutorServiceRule(boolean enableAwaitTermination,
+      long awaitTerminationTimeout,
+      TimeUnit awaitTerminationTimeUnit,
+      boolean awaitTerminationBeforeShutdown,
+      boolean useShutdown,
+      boolean useShutdownNow,
+      int threadCount,
+      int vmCount) {
     super(vmCount);
+    this.enableAwaitTermination = enableAwaitTermination;
+    this.awaitTerminationTimeout = awaitTerminationTimeout;
+    this.awaitTerminationTimeUnit = awaitTerminationTimeUnit;
+    this.awaitTerminationBeforeShutdown = awaitTerminationBeforeShutdown;
+    this.useShutdown = useShutdown;
+    this.useShutdownNow = useShutdownNow;
+    this.threadCount = threadCount;
   }
 
   public ExecutorService getExecutorService() {
@@ -173,7 +235,9 @@ public class DistributedExecutorServiceRule extends AbstractDistributedRule {
 
   private void invokeBefore() throws Exception {
     try {
-      delegate.set(new ExecutorServiceRule());
+      delegate.set(new ExecutorServiceRule(enableAwaitTermination, awaitTerminationTimeout,
+          awaitTerminationTimeUnit, awaitTerminationBeforeShutdown, useShutdown, useShutdownNow,
+          threadCount));
       delegate.get().before();
     } catch (Throwable throwable) {
       if (throwable instanceof Exception) {
@@ -186,4 +250,97 @@ public class DistributedExecutorServiceRule extends AbstractDistributedRule {
   private void invokeAfter() {
     delegate.get().after();
   }
+
+  public static class Builder {
+
+    private boolean enableAwaitTermination;
+    private long awaitTerminationTimeout;
+    private TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
+    private boolean awaitTerminationBeforeShutdown = true;
+    private boolean useShutdown;
+    private boolean useShutdownNow = true;
+    private int threadCount;
+    private int vmCount;
+
+    protected Builder() {
+      // nothing
+    }
+
+    /**
+     * Enables invocation of {@code awaitTermination} during {@code tearDown}. Default is disabled.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit the time unit of the timeout argument
+     */
+    public Builder awaitTermination(long timeout, TimeUnit unit) {
+      enableAwaitTermination = true;
+      awaitTerminationTimeout = timeout;
+      awaitTerminationTimeUnit = unit;
+      return this;
+    }
+
+    /**
+     * Enables invocation of {@code shutdown} during {@code tearDown}. Default is disabled.
+     */
+    public Builder useShutdown() {
+      useShutdown = true;
+      useShutdownNow = false;
+      return this;
+    }
+
+    /**
+     * Enables invocation of {@code shutdownNow} during {@code tearDown}. Default is enabled.
+     */
+    public Builder useShutdownNow() {
+      useShutdown = false;
+      useShutdownNow = true;
+      return this;
+    }
+
+    /**
+     * Specifies invocation of {@code awaitTermination} before {@code shutdown} or
+     * {@code shutdownNow}.
+     */
+    public Builder awaitTerminationBeforeShutdown() {
+      awaitTerminationBeforeShutdown = true;
+      return this;
+    }
+
+    /**
+     * Specifies invocation of {@code awaitTermination} after {@code shutdown} or
+     * {@code shutdownNow}.
+     */
+    public Builder awaitTerminationAfterShutdown() {
+      awaitTerminationBeforeShutdown = false;
+      return this;
+    }
+
+    /**
+     * Specifies the number of threads in the pool. Creates fixed thread pool if > 0. Default is 0
+     * which means (non-fixed) cached thread pool.
+     *
+     * @param threadCount the number of threads in the pool
+     */
+    public Builder threadCount(int threadCount) {
+      this.threadCount = threadCount;
+      return this;
+    }
+
+    /**
+     * Specifies the number of DUnit VMs to startup.
+     *
+     * @param vmCount the number of DUnit VMs to startup
+     */
+    public Builder vmCount(int vmCount) {
+      this.vmCount = vmCount;
+      return this;
+    }
+
+    /**
+     * Builds the instance of {@code ExecutorServiceRule}.
+     */
+    public DistributedExecutorServiceRule build() {
+      return new DistributedExecutorServiceRule(this);
+    }
+  }
 }
diff --git a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
index 40f1f5b..6081d20 100644
--- a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
+++ b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
@@ -165,7 +165,7 @@ org/apache/geode/test/dunit/rules/DistributedCounters,false,beforeBounceCounters
 org/apache/geode/test/dunit/rules/DistributedDiskDirRule,false,invoker:org/apache/geode/test/dunit/rules/RemoteInvoker,temporaryFolder:org/apache/geode/test/junit/rules/serializable/SerializableTemporaryFolder,testClassName:java/lang/String,testName:org/apache/geode/test/junit/rules/serializable/SerializableTestName,vmCount:int,vmEventListener:org/apache/geode/test/dunit/VMEventListener
 org/apache/geode/test/dunit/rules/DistributedDiskDirRule$InternalVMEventListener,false,this$0:org/apache/geode/test/dunit/rules/DistributedDiskDirRule
 org/apache/geode/test/dunit/rules/DistributedErrorCollector,false,beforeBounceErrors:java/util/Map
-org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule,false
+org/apache/geode/test/dunit/rules/DistributedExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,threadCount:int,useShutdown:boolean,useShutdownNow:boolean
 org/apache/geode/test/dunit/rules/DistributedExternalResource,false,invoker:org/apache/geode/test/dunit/rules/RemoteInvoker
 org/apache/geode/test/dunit/rules/DistributedMap,false,controller:java/util/concurrent/atomic/AtomicReference,identity:int,initialEntries:java/util/Map
 org/apache/geode/test/dunit/rules/DistributedReference,false,autoClose:java/util/concurrent/atomic/AtomicBoolean,identity:int
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
index e3a6a67..abc9e42 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
@@ -94,6 +94,7 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   protected final boolean awaitTerminationBeforeShutdown;
   protected final boolean useShutdown;
   protected final boolean useShutdownNow;
+  protected final int threadCount;
 
   protected transient volatile DedicatedThreadFactory threadFactory;
   protected transient volatile ExecutorService executor;
@@ -106,12 +107,13 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   }
 
   protected ExecutorServiceRule(Builder builder) {
-    enableAwaitTermination = builder.enableAwaitTermination;
-    awaitTerminationTimeout = builder.awaitTerminationTimeout;
-    awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
-    awaitTerminationBeforeShutdown = builder.awaitTerminationBeforeShutdown;
-    useShutdown = builder.useShutdown;
-    useShutdownNow = builder.useShutdownNow;
+    this(builder.enableAwaitTermination,
+        builder.awaitTerminationTimeout,
+        builder.awaitTerminationTimeUnit,
+        builder.awaitTerminationBeforeShutdown,
+        builder.useShutdown,
+        builder.useShutdownNow,
+        builder.threadCount);
   }
 
   /**
@@ -119,18 +121,47 @@ public class ExecutorServiceRule extends SerializableExternalResource {
    * during {@code tearDown}.
    */
   public ExecutorServiceRule() {
-    enableAwaitTermination = false;
-    awaitTerminationTimeout = 0;
-    awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
-    awaitTerminationBeforeShutdown = false;
-    useShutdown = false;
-    useShutdownNow = true;
+    this(false, 0, TimeUnit.NANOSECONDS, false, false, true, 0);
+  }
+
+  /**
+   * Constructs a {@code ExecutorServiceRule} which invokes {@code ExecutorService.shutdownNow()}
+   * during {@code tearDown}.
+   *
+   * @param threadCount The number of threads in the pool. Creates fixed thread pool if > 0; else
+   *        creates cached thread pool.
+   */
+  public ExecutorServiceRule(int threadCount) {
+    this(false, 0, TimeUnit.NANOSECONDS, false, false, true, threadCount);
+  }
+
+  /**
+   * For invocation by {@code DistributedExecutorServiceRule} which needs to subclass another class.
+   */
+  public ExecutorServiceRule(boolean enableAwaitTermination,
+      long awaitTerminationTimeout,
+      TimeUnit awaitTerminationTimeUnit,
+      boolean awaitTerminationBeforeShutdown,
+      boolean useShutdown,
+      boolean useShutdownNow,
+      int threadCount) {
+    this.enableAwaitTermination = enableAwaitTermination;
+    this.awaitTerminationTimeout = awaitTerminationTimeout;
+    this.awaitTerminationTimeUnit = awaitTerminationTimeUnit;
+    this.awaitTerminationBeforeShutdown = awaitTerminationBeforeShutdown;
+    this.useShutdown = useShutdown;
+    this.useShutdownNow = useShutdownNow;
+    this.threadCount = threadCount;
   }
 
   @Override
   public void before() {
     threadFactory = new DedicatedThreadFactory();
-    executor = Executors.newCachedThreadPool(threadFactory);
+    if (threadCount > 0) {
+      executor = Executors.newFixedThreadPool(threadCount, threadFactory);
+    } else {
+      executor = Executors.newCachedThreadPool(threadFactory);
+    }
   }
 
   @Override
@@ -375,6 +406,7 @@ public class ExecutorServiceRule extends SerializableExternalResource {
     protected boolean awaitTerminationBeforeShutdown = true;
     protected boolean useShutdown;
     protected boolean useShutdownNow = true;
+    protected int threadCount;
 
     protected Builder() {
       // nothing
@@ -430,6 +462,17 @@ public class ExecutorServiceRule extends SerializableExternalResource {
     }
 
     /**
+     * Specifies the number of threads in the pool. Creates fixed thread pool if > 0. Default is 0
+     * which means (non-fixed) cached thread pool.
+     *
+     * @param threadCount the number of threads in the pool
+     */
+    public Builder threadCount(int threadCount) {
+      this.threadCount = threadCount;
+      return this;
+    }
+
+    /**
      * Builds the instance of {@code ExecutorServiceRule}.
      */
     public ExecutorServiceRule build() {
diff --git a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
index 89eb8c1..b0be3e0 100644
--- a/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
+++ b/geode-junit/src/main/resources/org/apache/geode/test/junit/internal/sanctioned-geode-junit-serializables.txt
@@ -75,7 +75,7 @@ org/apache/geode/test/compiler/ClassBuilder,false,classPath:java/lang/String
 org/apache/geode/test/concurrent/FileBasedCountDownLatch,false,dataFile:java/io/File,lockFile:java/io/File
 org/apache/geode/test/junit/rules/CloseableReference,false,autoClose:java/util/concurrent/atomic/AtomicBoolean
 org/apache/geode/test/junit/rules/ConditionalIgnoreRule,false
-org/apache/geode/test/junit/rules/ExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,useShutdown:boolean,useShutdownNow:boolean
+org/apache/geode/test/junit/rules/ExecutorServiceRule,false,awaitTerminationBeforeShutdown:boolean,awaitTerminationTimeUnit:java/util/concurrent/TimeUnit,awaitTerminationTimeout:long,enableAwaitTermination:boolean,threadCount:int,useShutdown:boolean,useShutdownNow:boolean
 org/apache/geode/test/junit/rules/IgnoreOnWindowsRule,false
 org/apache/geode/test/junit/rules/IgnoreUntilRule,false
 org/apache/geode/test/junit/rules/JarFileRule,false,className:java/lang/String,jarFile:java/io/File,jarName:java/lang/String,makeJarLarge:boolean