You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/08/30 18:35:41 UTC

[geode] branch develop updated: GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 52a6e42  GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule
52a6e42 is described below

commit 52a6e4279b74f6c374061b9132df42fa8cb7f89c
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue Aug 28 14:57:52 2018 -0700

    GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule
---
 .../cache/PartitionedRegionCreationDUnitTest.java  |   1 -
 .../internal/locks/DLockServiceLeakTest.java       |   5 +-
 .../internal/cache/InterruptDiskJUnitTest.java     | 120 +++++++--------------
 .../rules/ExecutorServiceRuleIntegrationTest.java  |  10 +-
 .../test/junit/rules/ExecutorServiceRule.java      |  68 ++----------
 5 files changed, 59 insertions(+), 145 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java
index 5c4a546..e28df99 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java
@@ -51,7 +51,6 @@ import org.apache.geode.test.junit.rules.ExecutorServiceRule;
  * will verify the functionality under distributed scenario.
  */
 @SuppressWarnings("serial")
-
 public class PartitionedRegionCreationDUnitTest extends CacheTestCase {
 
   private VM vm0;
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
index f95f8e7..61fc97c 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
@@ -44,15 +44,14 @@ import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.test.junit.categories.DLockTest;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
-@Category({DLockTest.class})
+@Category(DLockTest.class)
 public class DLockServiceLeakTest {
 
   private Cache cache;
   private DistributedRegion testRegion;
 
   @Rule
-  public ExecutorServiceRule executorServiceRule =
-      ExecutorServiceRule.builder().threadCount(5).build();
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
 
   @Before
   public void setUp() {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
index f68e615..bdc2f33 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
@@ -14,125 +14,87 @@
  */
 package org.apache.geode.internal.cache;
 
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Properties;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 /**
  * Test of interrupting threads doing disk writes to see the effect.
- *
  */
 public class InterruptDiskJUnitTest {
 
-  private static volatile Thread puttingThread;
-  private static final long MAX_WAIT = 60 * 1000;
-  private DistributedSystem ds;
+  private final AtomicInteger nextValue = new AtomicInteger();
+  private final AtomicReference<Thread> puttingThread = new AtomicReference<>();
+
   private Cache cache;
   private Region<Object, Object> region;
-  private AtomicLong nextValue = new AtomicLong();
 
   @Rule
   public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
 
-  @Test
-  @Ignore
-  public void testLoop() throws Throwable {
-    for (int i = 0; i < 100; i++) {
-      System.err.println("i=" + i);
-      System.out.println("i=" + i);
-      testDRPutWithInterrupt();
-      tearDown();
-      setUp();
-    }
-  }
-
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
   @Before
   public void setUp() {
-    Properties props = new Properties();
-    props.setProperty(MCAST_PORT, "0");
-    props.setProperty(LOCATORS, "");
-    props.setProperty(LOG_LEVEL, "config"); // to keep diskPerf logs smaller
-    props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
-    props.setProperty(ENABLE_TIME_STATISTICS, "true");
-    props.setProperty(STATISTIC_ARCHIVE_FILE, "stats.gfs");
-    ds = DistributedSystem.connect(props);
-    cache = CacheFactory.create(ds);
-    File diskStore = new File("diskStore");
-    diskStore.mkdir();
-    cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskStore})
-        .create("store");
-    region = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT)
-        .setDiskStoreName("store").create("region");
-  }
+    String diskStoreName = getClass().getSimpleName() + "_diskStore";
+    String regionName = getClass().getSimpleName() + "_region";
 
+    Properties config = new Properties();
+    config.setProperty(MCAST_PORT, "0");
+    config.setProperty(LOCATORS, "");
 
-  @After
-  public void tearDown() {
-    ds.disconnect();
-  }
+    File diskDir = temporaryFolder.getRoot();
 
+    cache = new CacheFactory(config).create();
 
-  @Test
-  public void testDRPutWithInterrupt() throws Throwable {
-    Callable doPuts = new Callable() {
-
-      @Override
-      public Object call() {
-        puttingThread = Thread.currentThread();
-        long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT);
-        while (!Thread.currentThread().isInterrupted()) {
-          region.put(0, nextValue.incrementAndGet());
-          if (System.nanoTime() > end) {
-            fail("Did not get interrupted in 60 seconds");
-          }
-        }
-        return null;
-      }
-    };
+    cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskDir})
+        .create(diskStoreName);
 
-    Future result = executorServiceRule.submit(doPuts);
+    region = cache.createRegionFactory(REPLICATE_PERSISTENT).setDiskStoreName(diskStoreName)
+        .create(regionName);
+  }
 
+  @After
+  public void tearDown() {
+    cache.close();
+  }
 
-    Thread.sleep(50);
-    long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT);
-    while (puttingThread == null) {
-      Thread.sleep(50);
-      if (System.nanoTime() > end) {
-        fail("Putting thread not set in 60 seconds");
+  @Test
+  public void testDRPutWithInterrupt() throws Exception {
+    Future<Void> doPutWhileNotInterrupted = executorServiceRule.runAsync(() -> {
+      puttingThread.set(Thread.currentThread());
+      while (!Thread.currentThread().isInterrupted()) {
+        region.put(0, nextValue.incrementAndGet());
       }
-    }
-
-    puttingThread.interrupt();
-
-    result.get(60, TimeUnit.SECONDS);
+    });
 
-    assertEquals(nextValue.get(), region.get(0));
+    await().atMost(2, MINUTES).untilAsserted(() -> assertThat(puttingThread).isNotNull());
+    Thread.sleep(Duration.ofSeconds(1).toMillis());
+    puttingThread.get().interrupt();
 
+    doPutWhileNotInterrupted.get(2, MINUTES);
+    assertThat(region.get(0)).isEqualTo(nextValue.get());
   }
 }
diff --git a/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java b/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
index d2b2d6b..05e3bd6 100644
--- a/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
+++ b/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
@@ -34,10 +34,10 @@ import org.apache.geode.test.junit.runners.TestRunner;
 
 public class ExecutorServiceRuleIntegrationTest {
 
-  static volatile CountDownLatch hangLatch;
-  static volatile CountDownLatch terminateLatch;
-  static volatile ExecutorService executorService;
-  static Awaits.Invocations invocations;
+  private static volatile CountDownLatch hangLatch;
+  private static volatile CountDownLatch terminateLatch;
+  private static volatile ExecutorService executorService;
+  private static Awaits.Invocations invocations;
 
   @Before
   public void setUp() throws Exception {
@@ -64,7 +64,7 @@ public class ExecutorServiceRuleIntegrationTest {
   }
 
   @Test
-  public void awaitTermination() throws Exception {
+  public void awaitTermination() {
     Result result = TestRunner.runTest(Awaits.class);
     assertThat(result.wasSuccessful()).isTrue();
 
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
index 934c3c1..f79d874 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
@@ -14,8 +14,6 @@
  */
 package org.apache.geode.test.junit.rules;
 
-import static org.assertj.core.api.Assertions.assertThat;
-
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -32,22 +30,15 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
  * creates an {@code ExecutorService} which is terminated after the scope of the {@code Rule}. This
  * {@code Rule} can be used in tests for hangs, deadlocks, and infinite loops.
  *
- * <p>
- * By default, the {@code ExecutorService} is single-threaded. You can specify the thread count by
- * using {@link Builder#threadCount(int)} or {@link #ExecutorServiceRule(int)}.
- *
- * <p>
- * Example with default configuration (single-threaded and does not assert that tasks are done):
- *
  * <pre>
  * private CountDownLatch hangLatch = new CountDownLatch(1);
  *
  * {@literal @}Rule
- * public AsynchronousRule asynchronousRule = new AsynchronousRule();
+ * public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
  *
  * {@literal @}Test
  * public void doTest() throws Exception {
- *   Future<Void> result = asynchronousRule.runAsync(() -> {
+ *   Future<Void> result = executorServiceRule.runAsync(() -> {
  *     try {
  *       hangLatch.await();
  *     } catch (InterruptedException e) {
@@ -73,17 +64,13 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
  * private CountDownLatch hangLatch = new CountDownLatch(1);
  *
  * {@literal @}Rule
- * public ExecutorServiceRule asynchronousRule = ExecutorServiceRule.builder().threadCount(10).awaitTermination(10, MILLISECONDS).build();
+ * public ExecutorServiceRule executorServiceRule = ExecutorServiceRule.builder().awaitTermination(10, SECONDS).build();
  *
  * {@literal @}Test
  * public void doTest() throws Exception {
  *   for (int i = 0; i < 10; i++) {
- *     asynchronousRule.runAsync(() -> {
- *       try {
- *         hangLatch.await();
- *       } catch (InterruptedException e) {
- *         // do nothing
- *       }
+ *     executorServiceRule.runAsync(() -> {
+ *       hangLatch.await();
  *     });
  *   }
  * }
@@ -92,7 +79,6 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
 @SuppressWarnings("unused")
 public class ExecutorServiceRule extends SerializableExternalResource {
 
-  protected final int threadCount;
   protected final boolean enableAwaitTermination;
   protected final long awaitTerminationTimeout;
   protected final TimeUnit awaitTerminationTimeUnit;
@@ -110,7 +96,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   }
 
   protected ExecutorServiceRule(Builder builder) {
-    threadCount = builder.threadCount;
     enableAwaitTermination = builder.enableAwaitTermination;
     awaitTerminationTimeout = builder.awaitTerminationTimeout;
     awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
@@ -120,25 +105,10 @@ public class ExecutorServiceRule extends SerializableExternalResource {
   }
 
   /**
-   * Constructs a new single-threaded {@code ExecutorServiceRule} which invokes
-   * {@code ExecutorService.shutdownNow()} during {@code tearDown}.
+   * Constructs a {@code ExecutorServiceRule} which invokes {@code ExecutorService.shutdownNow()}
+   * during {@code tearDown}.
    */
   public ExecutorServiceRule() {
-    threadCount = 1;
-    enableAwaitTermination = false;
-    awaitTerminationTimeout = 0;
-    awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
-    awaitTerminationBeforeShutdown = false;
-    useShutdown = false;
-    useShutdownNow = true;
-  }
-
-  /**
-   * Constructs a new multi-threaded {@code ExecutorServiceRule} which invokes
-   * {@code ExecutorService.shutdownNow()} during {@code tearDown}.
-   */
-  public ExecutorServiceRule(int threadCount) {
-    this.threadCount = threadCount;
     enableAwaitTermination = false;
     awaitTerminationTimeout = 0;
     awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
@@ -149,11 +119,7 @@ public class ExecutorServiceRule extends SerializableExternalResource {
 
   @Override
   public void before() {
-    if (threadCount > 1) {
-      executor = Executors.newFixedThreadPool(threadCount);
-    } else {
-      executor = Executors.newSingleThreadExecutor();
-    }
+    executor = Executors.newCachedThreadPool();
   }
 
   @Override
@@ -272,12 +238,11 @@ public class ExecutorServiceRule extends SerializableExternalResource {
 
   public static class Builder {
 
-    protected int threadCount = 1;
-    protected boolean enableAwaitTermination = false;
-    protected long awaitTerminationTimeout = 0;
+    protected boolean enableAwaitTermination;
+    protected long awaitTerminationTimeout;
     protected TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
     protected boolean awaitTerminationBeforeShutdown = true;
-    protected boolean useShutdown = false;
+    protected boolean useShutdown;
     protected boolean useShutdownNow = true;
 
     protected Builder() {
@@ -285,16 +250,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
     }
 
     /**
-     * Configures the number of threads. Default is one thread.
-     *
-     * @param threadCount the number of threads in the pool
-     */
-    public Builder threadCount(int threadCount) {
-      this.threadCount = threadCount;
-      return this;
-    }
-
-    /**
      * Enables invocation of {@code awaitTermination} during {@code tearDown}. Default is disabled.
      *
      * @param timeout the maximum time to wait
@@ -348,7 +303,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
      * Builds the instance of {@code ExecutorServiceRule}.
      */
     public ExecutorServiceRule build() {
-      assertThat(threadCount).isGreaterThan(0);
       return new ExecutorServiceRule(this);
     }
   }