You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2018/08/15 23:05:36 UTC
[geode] branch develop updated: GEODE-5583: Use the ConcurrencyRule
to run threads (#2330)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new cfc30eb GEODE-5583: Use the ConcurrencyRule to run threads (#2330)
cfc30eb is described below
commit cfc30ebef382165c94e80533054a5cc8b8cc946d
Author: Helena Bales <hb...@pivotal.io>
AuthorDate: Wed Aug 15 16:05:30 2018 -0700
GEODE-5583: Use the ConcurrencyRule to run threads (#2330)
Use the concurrency rule to run threads instead of providing a custom
implementation that may or may not be correct.
---
.../internal/beans/ManagementAdapterTest.java | 113 ++++++++-------------
1 file changed, 40 insertions(+), 73 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
index 80f5c83..66927f0 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
@@ -14,103 +14,93 @@
*/
package org.apache.geode.management.internal.beans;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.doReturn;
import java.io.File;
import java.io.FileNotFoundException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
+import java.time.Duration;
import java.util.Scanner;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.DiskStoreStats;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.rules.ConcurrencyRule;
import org.apache.geode.test.junit.rules.ServerStarterRule;
public class ManagementAdapterTest {
private InternalCache cache = null;
private DiskStoreImpl diskStore = mock(DiskStoreImpl.class);
- private volatile boolean race = false;
+ private AtomicBoolean raceConditionFound = new AtomicBoolean(false);
@Rule
public ServerStarterRule serverRule =
new ServerStarterRule().withWorkingDir().withLogFile().withAutoStart();
+ @Rule
+ public ConcurrencyRule concurrencyRule = new ConcurrencyRule();
+
@Before
public void before() {
cache = serverRule.getCache();
- doReturn(new DiskStoreStats(cache.getInternalDistributedSystem(), "disk-stats")).when(diskStore)
- .getStats();
+ doReturn(new DiskStoreStats(cache.getInternalDistributedSystem(), "disk-stats"))
+ .when(diskStore).getStats();
doReturn(new File[] {}).when(diskStore).getDiskDirs();
}
@Test
- public void testHandlingNotificationsConcurrently() throws InterruptedException {
- /*
- * Tests to see if there are any concurrency issues handling resource lifecycle events.
- *
- * There are three runnables with specific tasks as below:
- * r1 - continuously send cache creation/removal notifications, thread modifying the state
- * r2 - continuously send disk creation/removal, thread relying on state
- * r3 - monitors log to see if there is a null pointer due race'
- *
- * Test runs at most 2 seconds or until a race.
- */
-
- Runnable r1 = () -> {
- while (!race) {
+ public void testHandlingNotificationsConcurrently() {
+ // continuously send cache creation/removal notifications, thread modifying the state
+ Callable<Void> cacheNotifications = () -> {
+ if (raceConditionFound.get() == Boolean.FALSE) {
try {
- cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.CACHE_REMOVE,
- cache);
+ InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+ ids.handleResourceEvent(ResourceEvent.CACHE_REMOVE, cache);
Thread.sleep(10);
- cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.CACHE_CREATE,
- cache);
+ ids.handleResourceEvent(ResourceEvent.CACHE_CREATE, cache);
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
+ return null;
};
- Runnable r2 = () -> {
- while (!race) {
+ // continuously send disk creation/removal, thread relying on state
+ Callable<Void> diskNotifications = () -> {
+ if (raceConditionFound.get() == Boolean.FALSE) {
try {
- cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_CREATE,
- diskStore);
+ InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+ ids.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, diskStore);
Thread.sleep(5);
- cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE,
- diskStore);
+ ids.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, diskStore);
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
+ return null;
};
- // r3 scans server log to see if there is null pointer due to caused by cache removal.
- Runnable r3 = () -> {
- while (!race) {
+ // scans server log to see if there is null pointer due to caused by cache removal.
+ Callable<Boolean> scanLogs = () -> {
+ if (raceConditionFound.get() == Boolean.FALSE) {
try {
- File logFile = new File(serverRule.getWorkingDir() + "/server.log");
+ File logFile = new File("server.log");
Scanner scanner = new Scanner(logFile);
while (scanner.hasNextLine()) {
final String lineFromFile = scanner.nextLine();
if (lineFromFile.contains("java.lang.NullPointerException")) {
- race = true;
+ raceConditionFound.set(Boolean.TRUE);
break;
}
}
@@ -118,40 +108,17 @@ public class ManagementAdapterTest {
// ignore this exception as the temp file might have been deleted after timeout
}
}
+ return raceConditionFound.get();
};
- List<Runnable> runnables = Arrays.asList(r1, r2, r3);
+ Duration notificationRunDuration = Duration.ofSeconds(3);
+ Duration scannerRunDuration = Duration.ofSeconds(4);
+ Duration timeout = Duration.ofSeconds(10);
- final int numThreads = runnables.size();
- final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
- final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
- try {
- final CountDownLatch allExecutorThreadsReady = new CountDownLatch(numThreads);
- final CountDownLatch afterInitBlocker = new CountDownLatch(1);
- final CountDownLatch allDone = new CountDownLatch(numThreads);
- for (final Runnable submittedTestRunnable : runnables) {
- threadPool.submit(() -> {
- allExecutorThreadsReady.countDown();
- try {
- afterInitBlocker.await();
- submittedTestRunnable.run();
- } catch (final Throwable e) {
- exceptions.add(e);
- } finally {
- allDone.countDown();
- }
- });
- }
- // wait until all threads are ready
- allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS);
- // start all test runners
- afterInitBlocker.countDown();
- // wait until all done or timeout
- allDone.await(2, TimeUnit.SECONDS);
- } finally {
- threadPool.shutdownNow();
- }
- assertThat(exceptions).as("failed with exception(s)" + exceptions).isEmpty();
- assertThat(race).as("is service to be null due to race").isEqualTo(false);
+ concurrencyRule.setTimeout(timeout);
+ concurrencyRule.add(cacheNotifications).repeatForDuration(notificationRunDuration);
+ concurrencyRule.add(diskNotifications).repeatForDuration(notificationRunDuration);
+ concurrencyRule.add(scanLogs).repeatForDuration(scannerRunDuration).expectValue(Boolean.FALSE);
+ concurrencyRule.executeInParallel();
}
}