You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2018/08/15 23:05:36 UTC

[geode] branch develop updated: GEODE-5583: Use the ConcurrencyRule to run threads (#2330)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new cfc30eb  GEODE-5583: Use the ConcurrencyRule to run threads (#2330)
cfc30eb is described below

commit cfc30ebef382165c94e80533054a5cc8b8cc946d
Author: Helena Bales <hb...@pivotal.io>
AuthorDate: Wed Aug 15 16:05:30 2018 -0700

    GEODE-5583: Use the ConcurrencyRule to run threads (#2330)
    
    Use the concurrency rule to run threads instead of providing a custom
    implementation that may or may not be correct.
---
 .../internal/beans/ManagementAdapterTest.java      | 113 ++++++++-------------
 1 file changed, 40 insertions(+), 73 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
index 80f5c83..66927f0 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/beans/ManagementAdapterTest.java
@@ -14,103 +14,93 @@
  */
 package org.apache.geode.management.internal.beans;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
 
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
+import java.time.Duration;
 import java.util.Scanner;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ResourceEvent;
 import org.apache.geode.internal.cache.DiskStoreImpl;
 import org.apache.geode.internal.cache.DiskStoreStats;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.junit.rules.ConcurrencyRule;
 import org.apache.geode.test.junit.rules.ServerStarterRule;
 
 public class ManagementAdapterTest {
 
   private InternalCache cache = null;
   private DiskStoreImpl diskStore = mock(DiskStoreImpl.class);
-  private volatile boolean race = false;
+  private AtomicBoolean raceConditionFound = new AtomicBoolean(false);
 
   @Rule
   public ServerStarterRule serverRule =
       new ServerStarterRule().withWorkingDir().withLogFile().withAutoStart();
 
+  @Rule
+  public ConcurrencyRule concurrencyRule = new ConcurrencyRule();
+
   @Before
   public void before() {
     cache = serverRule.getCache();
-    doReturn(new DiskStoreStats(cache.getInternalDistributedSystem(), "disk-stats")).when(diskStore)
-        .getStats();
+    doReturn(new DiskStoreStats(cache.getInternalDistributedSystem(), "disk-stats"))
+        .when(diskStore).getStats();
     doReturn(new File[] {}).when(diskStore).getDiskDirs();
   }
 
   @Test
-  public void testHandlingNotificationsConcurrently() throws InterruptedException {
-    /*
-     * Tests to see if there are any concurrency issues handling resource lifecycle events.
-     *
-     * There are three runnables with specific tasks as below:
-     * r1 - continuously send cache creation/removal notifications, thread modifying the state
-     * r2 - continuously send disk creation/removal, thread relying on state
-     * r3 - monitors log to see if there is a null pointer due race'
-     *
-     * Test runs at most 2 seconds or until a race.
-     */
-
-    Runnable r1 = () -> {
-      while (!race) {
+  public void testHandlingNotificationsConcurrently() {
+    // continuously send cache creation/removal notifications, thread modifying the state
+    Callable<Void> cacheNotifications = () -> {
+      if (raceConditionFound.get() == Boolean.FALSE) {
         try {
-          cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.CACHE_REMOVE,
-              cache);
+          InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+          ids.handleResourceEvent(ResourceEvent.CACHE_REMOVE, cache);
           Thread.sleep(10);
-          cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.CACHE_CREATE,
-              cache);
+          ids.handleResourceEvent(ResourceEvent.CACHE_CREATE, cache);
           Thread.sleep(10);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
+      return null;
     };
 
-    Runnable r2 = () -> {
-      while (!race) {
+    // continuously send disk creation/removal, thread relying on state
+    Callable<Void> diskNotifications = () -> {
+      if (raceConditionFound.get() == Boolean.FALSE) {
         try {
-          cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_CREATE,
-              diskStore);
+          InternalDistributedSystem ids = cache.getInternalDistributedSystem();
+          ids.handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, diskStore);
           Thread.sleep(5);
-          cache.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE,
-              diskStore);
+          ids.handleResourceEvent(ResourceEvent.DISKSTORE_REMOVE, diskStore);
           Thread.sleep(5);
         } catch (InterruptedException e) {
           e.printStackTrace();
         }
       }
+      return null;
     };
 
-    // r3 scans server log to see if there is null pointer due to caused by cache removal.
-    Runnable r3 = () -> {
-      while (!race) {
+    // scans server log to see if there is null pointer due to caused by cache removal.
+    Callable<Boolean> scanLogs = () -> {
+      if (raceConditionFound.get() == Boolean.FALSE) {
         try {
-          File logFile = new File(serverRule.getWorkingDir() + "/server.log");
+          File logFile = new File("server.log");
           Scanner scanner = new Scanner(logFile);
           while (scanner.hasNextLine()) {
             final String lineFromFile = scanner.nextLine();
             if (lineFromFile.contains("java.lang.NullPointerException")) {
-              race = true;
+              raceConditionFound.set(Boolean.TRUE);
               break;
             }
           }
@@ -118,40 +108,17 @@ public class ManagementAdapterTest {
           // ignore this exception as the temp file might have been deleted after timeout
         }
       }
+      return raceConditionFound.get();
     };
 
-    List<Runnable> runnables = Arrays.asList(r1, r2, r3);
+    Duration notificationRunDuration = Duration.ofSeconds(3);
+    Duration scannerRunDuration = Duration.ofSeconds(4);
+    Duration timeout = Duration.ofSeconds(10);
 
-    final int numThreads = runnables.size();
-    final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
-    final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
-    try {
-      final CountDownLatch allExecutorThreadsReady = new CountDownLatch(numThreads);
-      final CountDownLatch afterInitBlocker = new CountDownLatch(1);
-      final CountDownLatch allDone = new CountDownLatch(numThreads);
-      for (final Runnable submittedTestRunnable : runnables) {
-        threadPool.submit(() -> {
-          allExecutorThreadsReady.countDown();
-          try {
-            afterInitBlocker.await();
-            submittedTestRunnable.run();
-          } catch (final Throwable e) {
-            exceptions.add(e);
-          } finally {
-            allDone.countDown();
-          }
-        });
-      }
-      // wait until all threads are ready
-      allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS);
-      // start all test runners
-      afterInitBlocker.countDown();
-      // wait until all done or timeout
-      allDone.await(2, TimeUnit.SECONDS);
-    } finally {
-      threadPool.shutdownNow();
-    }
-    assertThat(exceptions).as("failed with exception(s)" + exceptions).isEmpty();
-    assertThat(race).as("is service to be null due to race").isEqualTo(false);
+    concurrencyRule.setTimeout(timeout);
+    concurrencyRule.add(cacheNotifications).repeatForDuration(notificationRunDuration);
+    concurrencyRule.add(diskNotifications).repeatForDuration(notificationRunDuration);
+    concurrencyRule.add(scanLogs).repeatForDuration(scannerRunDuration).expectValue(Boolean.FALSE);
+    concurrencyRule.executeInParallel();
   }
 }