You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2018/08/30 18:35:41 UTC
[geode] branch develop updated: GEODE-5652: use unlimited
ThreadPoolExecutor in ExecutorServiceRule
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 52a6e42 GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule
52a6e42 is described below
commit 52a6e4279b74f6c374061b9132df42fa8cb7f89c
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue Aug 28 14:57:52 2018 -0700
GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule
---
.../cache/PartitionedRegionCreationDUnitTest.java | 1 -
.../internal/locks/DLockServiceLeakTest.java | 5 +-
.../internal/cache/InterruptDiskJUnitTest.java | 120 +++++++--------------
.../rules/ExecutorServiceRuleIntegrationTest.java | 10 +-
.../test/junit/rules/ExecutorServiceRule.java | 68 ++----------
5 files changed, 59 insertions(+), 145 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java
index 5c4a546..e28df99 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java
@@ -51,7 +51,6 @@ import org.apache.geode.test.junit.rules.ExecutorServiceRule;
* will verify the functionality under distributed scenario.
*/
@SuppressWarnings("serial")
-
public class PartitionedRegionCreationDUnitTest extends CacheTestCase {
private VM vm0;
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
index f95f8e7..61fc97c 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java
@@ -44,15 +44,14 @@ import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.test.junit.categories.DLockTest;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
-@Category({DLockTest.class})
+@Category(DLockTest.class)
public class DLockServiceLeakTest {
private Cache cache;
private DistributedRegion testRegion;
@Rule
- public ExecutorServiceRule executorServiceRule =
- ExecutorServiceRule.builder().threadCount(5).build();
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
@Before
public void setUp() {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
index f68e615..bdc2f33 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java
@@ -14,125 +14,87 @@
*/
package org.apache.geode.internal.cache;
-import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
-import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
-import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import java.io.File;
+import java.time.Duration;
import java.util.Properties;
-import java.util.concurrent.Callable;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
/**
* Test of interrupting threads doing disk writes to see the effect.
- *
*/
public class InterruptDiskJUnitTest {
- private static volatile Thread puttingThread;
- private static final long MAX_WAIT = 60 * 1000;
- private DistributedSystem ds;
+ private final AtomicInteger nextValue = new AtomicInteger();
+ private final AtomicReference<Thread> puttingThread = new AtomicReference<>();
+
private Cache cache;
private Region<Object, Object> region;
- private AtomicLong nextValue = new AtomicLong();
@Rule
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
- @Test
- @Ignore
- public void testLoop() throws Throwable {
- for (int i = 0; i < 100; i++) {
- System.err.println("i=" + i);
- System.out.println("i=" + i);
- testDRPutWithInterrupt();
- tearDown();
- setUp();
- }
- }
-
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp() {
- Properties props = new Properties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "");
- props.setProperty(LOG_LEVEL, "config"); // to keep diskPerf logs smaller
- props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
- props.setProperty(ENABLE_TIME_STATISTICS, "true");
- props.setProperty(STATISTIC_ARCHIVE_FILE, "stats.gfs");
- ds = DistributedSystem.connect(props);
- cache = CacheFactory.create(ds);
- File diskStore = new File("diskStore");
- diskStore.mkdir();
- cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskStore})
- .create("store");
- region = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT)
- .setDiskStoreName("store").create("region");
- }
+ String diskStoreName = getClass().getSimpleName() + "_diskStore";
+ String regionName = getClass().getSimpleName() + "_region";
+ Properties config = new Properties();
+ config.setProperty(MCAST_PORT, "0");
+ config.setProperty(LOCATORS, "");
- @After
- public void tearDown() {
- ds.disconnect();
- }
+ File diskDir = temporaryFolder.getRoot();
+ cache = new CacheFactory(config).create();
- @Test
- public void testDRPutWithInterrupt() throws Throwable {
- Callable doPuts = new Callable() {
-
- @Override
- public Object call() {
- puttingThread = Thread.currentThread();
- long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT);
- while (!Thread.currentThread().isInterrupted()) {
- region.put(0, nextValue.incrementAndGet());
- if (System.nanoTime() > end) {
- fail("Did not get interrupted in 60 seconds");
- }
- }
- return null;
- }
- };
+ cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskDir})
+ .create(diskStoreName);
- Future result = executorServiceRule.submit(doPuts);
+ region = cache.createRegionFactory(REPLICATE_PERSISTENT).setDiskStoreName(diskStoreName)
+ .create(regionName);
+ }
+ @After
+ public void tearDown() {
+ cache.close();
+ }
- Thread.sleep(50);
- long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT);
- while (puttingThread == null) {
- Thread.sleep(50);
- if (System.nanoTime() > end) {
- fail("Putting thread not set in 60 seconds");
+ @Test
+ public void testDRPutWithInterrupt() throws Exception {
+ Future<Void> doPutWhileNotInterrupted = executorServiceRule.runAsync(() -> {
+ puttingThread.set(Thread.currentThread());
+ while (!Thread.currentThread().isInterrupted()) {
+ region.put(0, nextValue.incrementAndGet());
}
- }
-
- puttingThread.interrupt();
-
- result.get(60, TimeUnit.SECONDS);
+ });
- assertEquals(nextValue.get(), region.get(0));
+ await().atMost(2, MINUTES).untilAsserted(() -> assertThat(puttingThread).isNotNull());
+ Thread.sleep(Duration.ofSeconds(1).toMillis());
+ puttingThread.get().interrupt();
+ doPutWhileNotInterrupted.get(2, MINUTES);
+ assertThat(region.get(0)).isEqualTo(nextValue.get());
}
}
diff --git a/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java b/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
index d2b2d6b..05e3bd6 100644
--- a/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
+++ b/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java
@@ -34,10 +34,10 @@ import org.apache.geode.test.junit.runners.TestRunner;
public class ExecutorServiceRuleIntegrationTest {
- static volatile CountDownLatch hangLatch;
- static volatile CountDownLatch terminateLatch;
- static volatile ExecutorService executorService;
- static Awaits.Invocations invocations;
+ private static volatile CountDownLatch hangLatch;
+ private static volatile CountDownLatch terminateLatch;
+ private static volatile ExecutorService executorService;
+ private static Awaits.Invocations invocations;
@Before
public void setUp() throws Exception {
@@ -64,7 +64,7 @@ public class ExecutorServiceRuleIntegrationTest {
}
@Test
- public void awaitTermination() throws Exception {
+ public void awaitTermination() {
Result result = TestRunner.runTest(Awaits.class);
assertThat(result.wasSuccessful()).isTrue();
diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
index 934c3c1..f79d874 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java
@@ -14,8 +14,6 @@
*/
package org.apache.geode.test.junit.rules;
-import static org.assertj.core.api.Assertions.assertThat;
-
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -32,22 +30,15 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
* creates an {@code ExecutorService} which is terminated after the scope of the {@code Rule}. This
* {@code Rule} can be used in tests for hangs, deadlocks, and infinite loops.
*
- * <p>
- * By default, the {@code ExecutorService} is single-threaded. You can specify the thread count by
- * using {@link Builder#threadCount(int)} or {@link #ExecutorServiceRule(int)}.
- *
- * <p>
- * Example with default configuration (single-threaded and does not assert that tasks are done):
- *
* <pre>
* private CountDownLatch hangLatch = new CountDownLatch(1);
*
* {@literal @}Rule
- * public AsynchronousRule asynchronousRule = new AsynchronousRule();
+ * public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
*
* {@literal @}Test
* public void doTest() throws Exception {
- * Future<Void> result = asynchronousRule.runAsync(() -> {
+ * Future<Void> result = executorServiceRule.runAsync(() -> {
* try {
* hangLatch.await();
* } catch (InterruptedException e) {
@@ -73,17 +64,13 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
* private CountDownLatch hangLatch = new CountDownLatch(1);
*
* {@literal @}Rule
- * public ExecutorServiceRule asynchronousRule = ExecutorServiceRule.builder().threadCount(10).awaitTermination(10, MILLISECONDS).build();
+ * public ExecutorServiceRule executorServiceRule = ExecutorServiceRule.builder().awaitTermination(10, SECONDS).build();
*
* {@literal @}Test
* public void doTest() throws Exception {
* for (int i = 0; i < 10; i++) {
- * asynchronousRule.runAsync(() -> {
- * try {
- * hangLatch.await();
- * } catch (InterruptedException e) {
- * // do nothing
- * }
+ * executorServiceRule.runAsync(() -> {
+ * hangLatch.await();
* });
* }
* }
@@ -92,7 +79,6 @@ import org.apache.geode.test.junit.rules.serializable.SerializableExternalResour
@SuppressWarnings("unused")
public class ExecutorServiceRule extends SerializableExternalResource {
- protected final int threadCount;
protected final boolean enableAwaitTermination;
protected final long awaitTerminationTimeout;
protected final TimeUnit awaitTerminationTimeUnit;
@@ -110,7 +96,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
}
protected ExecutorServiceRule(Builder builder) {
- threadCount = builder.threadCount;
enableAwaitTermination = builder.enableAwaitTermination;
awaitTerminationTimeout = builder.awaitTerminationTimeout;
awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
@@ -120,25 +105,10 @@ public class ExecutorServiceRule extends SerializableExternalResource {
}
/**
- * Constructs a new single-threaded {@code ExecutorServiceRule} which invokes
- * {@code ExecutorService.shutdownNow()} during {@code tearDown}.
+ * Constructs a {@code ExecutorServiceRule} which invokes {@code ExecutorService.shutdownNow()}
+ * during {@code tearDown}.
*/
public ExecutorServiceRule() {
- threadCount = 1;
- enableAwaitTermination = false;
- awaitTerminationTimeout = 0;
- awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
- awaitTerminationBeforeShutdown = false;
- useShutdown = false;
- useShutdownNow = true;
- }
-
- /**
- * Constructs a new multi-threaded {@code ExecutorServiceRule} which invokes
- * {@code ExecutorService.shutdownNow()} during {@code tearDown}.
- */
- public ExecutorServiceRule(int threadCount) {
- this.threadCount = threadCount;
enableAwaitTermination = false;
awaitTerminationTimeout = 0;
awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
@@ -149,11 +119,7 @@ public class ExecutorServiceRule extends SerializableExternalResource {
@Override
public void before() {
- if (threadCount > 1) {
- executor = Executors.newFixedThreadPool(threadCount);
- } else {
- executor = Executors.newSingleThreadExecutor();
- }
+ executor = Executors.newCachedThreadPool();
}
@Override
@@ -272,12 +238,11 @@ public class ExecutorServiceRule extends SerializableExternalResource {
public static class Builder {
- protected int threadCount = 1;
- protected boolean enableAwaitTermination = false;
- protected long awaitTerminationTimeout = 0;
+ protected boolean enableAwaitTermination;
+ protected long awaitTerminationTimeout;
protected TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
protected boolean awaitTerminationBeforeShutdown = true;
- protected boolean useShutdown = false;
+ protected boolean useShutdown;
protected boolean useShutdownNow = true;
protected Builder() {
@@ -285,16 +250,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
}
/**
- * Configures the number of threads. Default is one thread.
- *
- * @param threadCount the number of threads in the pool
- */
- public Builder threadCount(int threadCount) {
- this.threadCount = threadCount;
- return this;
- }
-
- /**
* Enables invocation of {@code awaitTermination} during {@code tearDown}. Default is disabled.
*
* @param timeout the maximum time to wait
@@ -348,7 +303,6 @@ public class ExecutorServiceRule extends SerializableExternalResource {
* Builds the instance of {@code ExecutorServiceRule}.
*/
public ExecutorServiceRule build() {
- assertThat(threadCount).isGreaterThan(0);
return new ExecutorServiceRule(this);
}
}