You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/04/19 20:19:31 UTC

[05/13] incubator-geode git commit: GEODE-1125: Introduced awaitility to make the receiver wait in order to receive the new puts from the sender.

GEODE-1125: Introduced awaitility to make the receiver wait in order to receive the new puts from the sender.

* Removed the use of helper functions as they hindered the understanding of the test.
* Transformed the test into a linear model rather than mixing sender and receiver activities within one function.
* Introduced a awaitility clause so that the receiver waits for 30 secs for the transmission to be over.

This closes #127


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8f7f4d5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8f7f4d5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8f7f4d5f

Branch: refs/heads/feature/GEODE-1233
Commit: 8f7f4d5f4bbb393623eb7a6fa200d9749acf5a02
Parents: 4a54aa7
Author: nabarun <nn...@pivotal.io>
Authored: Sat Apr 9 18:32:05 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Mon Apr 18 10:23:48 2016 -0700

----------------------------------------------------------------------
 ...allelGatewaySenderOperation_2_DUnitTest.java | 143 ++++++++++++++-----
 1 file changed, 106 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8f7f4d5f/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
index 552da9e..c922314 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
@@ -16,6 +16,8 @@
  */
 package com.gemstone.gemfire.internal.cache.wan.concurrent;
 
+import java.util.concurrent.TimeUnit;
+
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
@@ -28,6 +30,7 @@ import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.test.dunit.Wait;
 import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.jayway.awaitility.Awaitility;
 
 /**
  *
@@ -56,15 +59,36 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
     Integer nyPort = locatorPorts[1];
 
     try {
-      createAndStartSender(vm4, lnPort, 5, false, true);
+      String regionName = getTestMethodName() + "_PR";
 
-      createReceiverAndDoPutsInPausedSender(nyPort);
+      createCacheInVMs(lnPort, vm4);
+      vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
+      vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
+      vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY));
+      vm4.invoke(() -> startSender("ln"));
+      vm4.invoke(() -> pauseSender("ln"));
+
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+      vm2.invoke(() -> createReceiver(nyPort));
+
+      vm4.invoke(() -> doPuts(regionName, 10));
+      vm4.invoke(() -> validateRegionSize(regionName, 10));
+
+      vm2.invoke(() -> validateRegionSize(regionName, 0));
 
       vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR"));
 
-      recreatePRDoPutsAndValidateRegionSizes(0, true);
+      vm2.invoke(() -> validateRegionSize(regionName, 0));
+
+      vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+      vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
+
+      vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 0)));
+
+      vm4.invoke(() -> validateRegionSize(regionName, 10));
     } finally {
-      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
     }
   }
   
@@ -74,19 +98,39 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
     Integer nyPort = locatorPorts[1];
 
     try {
-      createAndStartSender(vm4, lnPort, 4, false, true);
+      String regionName = getTestMethodName() + "_PR";
 
-      createReceiverAndDoPutsInPausedSender(nyPort);
+      createCacheInVMs(lnPort, vm4);
+      vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
+      vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
+      vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 4, OrderPolicy.KEY));
+      vm4.invoke(() -> startSender("ln"));
+      vm4.invoke(() -> pauseSender("ln"));
 
-      vm4.invoke(() -> resumeSender("ln"));
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+      vm2.invoke(() -> createReceiver(nyPort));
 
-      vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
+      vm4.invoke(() -> doPuts(regionName, 10));
+      vm4.invoke(() -> validateRegionSize(regionName, 10));
+
+      vm2.invoke(() -> validateRegionSize(regionName, 0));
 
+      vm4.invoke(() -> resumeSender("ln"));
+      vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
       vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR"));
 
-      recreatePRDoPutsAndValidateRegionSizes(10, false);
+      vm2.invoke(() -> validateRegionSize(regionName, 10));
+
+      vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+      vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
+
+      vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 20)));
+
+      vm4.invoke(() -> validateRegionSize(regionName, 10));
+
     } finally {
-      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
     }
   }
 
@@ -94,20 +138,38 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
     Integer nyPort = locatorPorts[1];
+    try {
+      String regionName = getTestMethodName() + "_PR";
+      createCacheInVMs(lnPort, vm4);
+      vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
+      vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
+      vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 7, OrderPolicy.KEY));
+      vm4.invoke(() -> startSender("ln"));
+      vm4.invoke(() -> pauseSender("ln"));
 
-    createAndStartSender(vm4, lnPort, 7, false, true);
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+      vm2.invoke(() -> createReceiver(nyPort));
 
-    createReceiverAndDoPutsInPausedSender(nyPort);
+      vm4.invoke(() -> doPuts(regionName, 10));
+      vm4.invoke(() -> validateRegionSize(regionName, 10));
+      vm4.invoke(() -> closeRegion(getTestMethodName() + "_PR"));
+      vm4.invoke(() -> resumeSender("ln"));
 
-    vm4.invoke(() -> closeRegion(getTestMethodName() + "_PR"));
+      Thread.sleep(500);
 
-    vm4.invoke(() -> resumeSender("ln"));
+      vm2.invoke(() -> validateRegionSize(regionName, 0));
 
-    Wait.pause(500); //paused if there is any element which is received on remote site
+      vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+      vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
 
-    recreatePRDoPutsAndValidateRegionSizes(0, false);
+      vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 10)));
 
-    vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm4.invoke(() -> validateRegionSize(regionName, 10));
+    }
+    finally {
+      vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
+    }
   }
 
   public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultenuousPut_RecreateRegion() throws Exception {
@@ -201,12 +263,27 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
     Integer nyPort = locatorPorts[1];
 
     try {
-      createAndStartSender(vm4, lnPort, 5, false, true);
+      String regionName = getTestMethodName() + "_PR";
 
-      createReceiverAndDoPutsInPausedSender(nyPort);
+      createCacheInVMs(lnPort, vm4);
+      vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
+      vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
+      vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY));
+      vm4.invoke(() -> startSender("ln"));
+      vm4.invoke(() -> pauseSender("ln"));
+
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+      vm2.invoke(() -> createReceiver(nyPort));
+
+      vm4.invoke(() -> doPuts(regionName, 10));
+      vm4.invoke(() -> validateRegionSize(regionName, 10));
+
+      vm2.invoke(() -> validateRegionSize(regionName, 0));
 
       AsyncInvocation putAsync = vm4.invokeAsync(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", 10, 2000 ));
-      AsyncInvocation localDestroyAsync = vm4.invokeAsync(() -> ConcurrentParallelGatewaySenderOperation_2_DUnitTest.closeRegion( getTestMethodName() + "_PR" ));
+      AsyncInvocation localDestroyAsync = vm4.invokeAsync(() -> ConcurrentParallelGatewaySenderOperation_2_DUnitTest.
+        closeRegion( getTestMethodName() + "_PR" ));
       try {
         putAsync.join();
         localDestroyAsync.join();
@@ -214,10 +291,16 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
         e.printStackTrace();
         fail("Interrupted the async invocation.");
       }
+      vm2.invoke(() -> validateRegionSize(regionName, 0));
 
-      recreatePRDoPutsAndValidateRegionSizes(0, true);
+      vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+      vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
+
+      vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 0)));
+
+      vm4.invoke(() -> validateRegionSize(regionName, 10));
     } finally {
-      vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+      vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
     }
   }
 
@@ -474,12 +557,12 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
 
   protected void createAndStartSender(VM vm, int port, int concurrencyLevel, boolean manualStart, boolean pause) {
     vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
+    vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
     createSender(vm, concurrencyLevel, manualStart);
     vm.invoke(() -> startSender("ln"));
     if (pause) {
       vm.invoke(() -> pauseSender("ln"));
     }
-    vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
     LogWriterUtils.getLogWriter().info("Created PRs on local site");
   }
 
@@ -495,20 +578,6 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
     vm2.invoke(() -> validateRegionSize(regionName, 0));
   }
 
-  protected void recreatePRDoPutsAndValidateRegionSizes(int expectedRegionSize, boolean resumeSender) {
-    // Note: This is a test-specific method used by several test to recreate a partitioned region,
-    // do puts and validate region sizes in vm2 and vm4.
-    // since shadowPR is locally destroyed, so no data to dispatch
-    String regionName = getTestMethodName() + "_PR";
-    vm2.invoke(() -> validateRegionSize(regionName, expectedRegionSize));
-    if (resumeSender) {
-      vm4.invoke(() -> resumeSender("ln"));
-    }
-    vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
-    vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
-    validateRegionSizes(regionName, 10, vm4, vm2);
-  }
-
   protected void createAndStartTwoSenders(VM vm, int port, int concurrencyLevel) {
     // Note: This is a test-specific method used to create and start 2 senders.
     vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));