You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/04/19 20:19:31 UTC
[05/13] incubator-geode git commit: GEODE-1125: Introduced awaitility
to make the receiver wait in order to receive the new puts from the sender.
GEODE-1125: Introduced awaitility to make the receiver wait in order to receive the new puts from the sender.
* Removed the use of helper functions as they hindered the understanding of the test.
* Transformed the test into a linear model rather than mixing sender and receiver activities within one function.
* Introduced a awaitility clause so that the receiver waits for 30 secs for the transmission to be over.
This closes #127
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8f7f4d5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8f7f4d5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8f7f4d5f
Branch: refs/heads/feature/GEODE-1233
Commit: 8f7f4d5f4bbb393623eb7a6fa200d9749acf5a02
Parents: 4a54aa7
Author: nabarun <nn...@pivotal.io>
Authored: Sat Apr 9 18:32:05 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Mon Apr 18 10:23:48 2016 -0700
----------------------------------------------------------------------
...allelGatewaySenderOperation_2_DUnitTest.java | 143 ++++++++++++++-----
1 file changed, 106 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8f7f4d5f/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
index 552da9e..c922314 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
@@ -16,6 +16,8 @@
*/
package com.gemstone.gemfire.internal.cache.wan.concurrent;
+import java.util.concurrent.TimeUnit;
+
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
@@ -28,6 +30,7 @@ import com.gemstone.gemfire.test.dunit.LogWriterUtils;
import com.gemstone.gemfire.test.dunit.VM;
import com.gemstone.gemfire.test.dunit.Wait;
import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.jayway.awaitility.Awaitility;
/**
*
@@ -56,15 +59,36 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer nyPort = locatorPorts[1];
try {
- createAndStartSender(vm4, lnPort, 5, false, true);
+ String regionName = getTestMethodName() + "_PR";
- createReceiverAndDoPutsInPausedSender(nyPort);
+ createCacheInVMs(lnPort, vm4);
+ vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
+ vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY));
+ vm4.invoke(() -> startSender("ln"));
+ vm4.invoke(() -> pauseSender("ln"));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+ vm2.invoke(() -> createReceiver(nyPort));
+
+ vm4.invoke(() -> doPuts(regionName, 10));
+ vm4.invoke(() -> validateRegionSize(regionName, 10));
+
+ vm2.invoke(() -> validateRegionSize(regionName, 0));
vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR"));
- recreatePRDoPutsAndValidateRegionSizes(0, true);
+ vm2.invoke(() -> validateRegionSize(regionName, 0));
+
+ vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+ vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
+
+ vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 0)));
+
+ vm4.invoke(() -> validateRegionSize(regionName, 10));
} finally {
- vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+ vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
}
}
@@ -74,19 +98,39 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer nyPort = locatorPorts[1];
try {
- createAndStartSender(vm4, lnPort, 4, false, true);
+ String regionName = getTestMethodName() + "_PR";
- createReceiverAndDoPutsInPausedSender(nyPort);
+ createCacheInVMs(lnPort, vm4);
+ vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
+ vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 4, OrderPolicy.KEY));
+ vm4.invoke(() -> startSender("ln"));
+ vm4.invoke(() -> pauseSender("ln"));
- vm4.invoke(() -> resumeSender("ln"));
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+ vm2.invoke(() -> createReceiver(nyPort));
- vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm4.invoke(() -> doPuts(regionName, 10));
+ vm4.invoke(() -> validateRegionSize(regionName, 10));
+
+ vm2.invoke(() -> validateRegionSize(regionName, 0));
+ vm4.invoke(() -> resumeSender("ln"));
+ vm4.invoke(() -> validateParallelSenderQueueAllBucketsDrained("ln"));
vm4.invoke(() -> localDestroyRegion(getTestMethodName() + "_PR"));
- recreatePRDoPutsAndValidateRegionSizes(10, false);
+ vm2.invoke(() -> validateRegionSize(regionName, 10));
+
+ vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+ vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
+
+ vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 20)));
+
+ vm4.invoke(() -> validateRegionSize(regionName, 10));
+
} finally {
- vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+ vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
}
}
@@ -94,20 +138,38 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer[] locatorPorts = createLNAndNYLocators();
Integer lnPort = locatorPorts[0];
Integer nyPort = locatorPorts[1];
+ try {
+ String regionName = getTestMethodName() + "_PR";
+ createCacheInVMs(lnPort, vm4);
+ vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
+ vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 7, OrderPolicy.KEY));
+ vm4.invoke(() -> startSender("ln"));
+ vm4.invoke(() -> pauseSender("ln"));
- createAndStartSender(vm4, lnPort, 7, false, true);
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+ vm2.invoke(() -> createReceiver(nyPort));
- createReceiverAndDoPutsInPausedSender(nyPort);
+ vm4.invoke(() -> doPuts(regionName, 10));
+ vm4.invoke(() -> validateRegionSize(regionName, 10));
+ vm4.invoke(() -> closeRegion(getTestMethodName() + "_PR"));
+ vm4.invoke(() -> resumeSender("ln"));
- vm4.invoke(() -> closeRegion(getTestMethodName() + "_PR"));
+ Thread.sleep(500);
- vm4.invoke(() -> resumeSender("ln"));
+ vm2.invoke(() -> validateRegionSize(regionName, 0));
- Wait.pause(500); //paused if there is any element which is received on remote site
+ vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+ vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
- recreatePRDoPutsAndValidateRegionSizes(0, false);
+ vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 10)));
- vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+ vm4.invoke(() -> validateRegionSize(regionName, 10));
+ }
+ finally {
+ vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
+ }
}
public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultenuousPut_RecreateRegion() throws Exception {
@@ -201,12 +263,27 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer nyPort = locatorPorts[1];
try {
- createAndStartSender(vm4, lnPort, 5, false, true);
+ String regionName = getTestMethodName() + "_PR";
- createReceiverAndDoPutsInPausedSender(nyPort);
+ createCacheInVMs(lnPort, vm4);
+ vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1);
+ vm4.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
+ vm4.invoke(() -> createConcurrentSender("ln", 2, true, 100, 10, false, false, null, false, 5, OrderPolicy.KEY));
+ vm4.invoke(() -> startSender("ln"));
+ vm4.invoke(() -> pauseSender("ln"));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
+ vm2.invoke(() -> createReceiver(nyPort));
+
+ vm4.invoke(() -> doPuts(regionName, 10));
+ vm4.invoke(() -> validateRegionSize(regionName, 10));
+
+ vm2.invoke(() -> validateRegionSize(regionName, 0));
AsyncInvocation putAsync = vm4.invokeAsync(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR", 10, 2000 ));
- AsyncInvocation localDestroyAsync = vm4.invokeAsync(() -> ConcurrentParallelGatewaySenderOperation_2_DUnitTest.closeRegion( getTestMethodName() + "_PR" ));
+ AsyncInvocation localDestroyAsync = vm4.invokeAsync(() -> ConcurrentParallelGatewaySenderOperation_2_DUnitTest.
+ closeRegion( getTestMethodName() + "_PR" ));
try {
putAsync.join();
localDestroyAsync.join();
@@ -214,10 +291,16 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
e.printStackTrace();
fail("Interrupted the async invocation.");
}
+ vm2.invoke(() -> validateRegionSize(regionName, 0));
- recreatePRDoPutsAndValidateRegionSizes(0, true);
+ vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
+ vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
+
+ vm2.invoke(() -> Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> validateRegionSize(regionName, 0)));
+
+ vm4.invoke(() -> validateRegionSize(regionName, 10));
} finally {
- vm4.invoke(() -> clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME());
+ vm4.invoke(() -> AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0);
}
}
@@ -474,12 +557,12 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
protected void createAndStartSender(VM vm, int port, int concurrencyLevel, boolean manualStart, boolean pause) {
vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
+ vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
createSender(vm, concurrencyLevel, manualStart);
vm.invoke(() -> startSender("ln"));
if (pause) {
vm.invoke(() -> pauseSender("ln"));
}
- vm.invoke(() -> createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap()));
LogWriterUtils.getLogWriter().info("Created PRs on local site");
}
@@ -495,20 +578,6 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
vm2.invoke(() -> validateRegionSize(regionName, 0));
}
- protected void recreatePRDoPutsAndValidateRegionSizes(int expectedRegionSize, boolean resumeSender) {
- // Note: This is a test-specific method used by several test to recreate a partitioned region,
- // do puts and validate region sizes in vm2 and vm4.
- // since shadowPR is locally destroyed, so no data to dispatch
- String regionName = getTestMethodName() + "_PR";
- vm2.invoke(() -> validateRegionSize(regionName, expectedRegionSize));
- if (resumeSender) {
- vm4.invoke(() -> resumeSender("ln"));
- }
- vm4.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 10, isOffHeap()));
- vm4.invoke(() -> doPutsFrom(regionName, 10, 20));
- validateRegionSizes(regionName, 10, vm4, vm2);
- }
-
protected void createAndStartTwoSenders(VM vm, int port, int concurrencyLevel) {
// Note: This is a test-specific method used to create and start 2 senders.
vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));