You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/01 21:55:42 UTC
[25/50] [abbrv] incubator-geode git commit: GEODE-875: CI failure:
ParallelGatewaySenderOperationsOffHeapDUnitTest.testParallelGatewaySender_SingleNode_UserPR_localDestroy_SimultenuousPut_RecreateRegion
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c6e0d2d9/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 19f6c4b..68614a0 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -18,14 +18,12 @@ package com.gemstone.gemfire.internal.cache.wan.parallel;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.DistributedTestCase;
import com.gemstone.gemfire.test.dunit.RMIException;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase.ExpectedException;
/**
* DUnit test for operations on ParallelGatewaySender
@@ -46,41 +44,11 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
}
public void testParallelGatewaySenderWithoutStarting() {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, false);
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });
@@ -89,68 +57,30 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm6.invoke(WANTestBase.class, "verifySenderStoppedState", new Object[] { "ln" });
vm7.invoke(WANTestBase.class, "verifySenderStoppedState", new Object[] { "ln" });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName + "_PR", 0 });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName + "_PR", 0 });
+ validateRegionSizes(testName + "_PR", 0, vm2, vm3);
}
/**
* Defect 44323 (ParallelGatewaySender should not be started on Accessor node)
*/
public void testParallelGatewaySenderStartOnAccessorNode() {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegionAsAccessor", new Object[] {
- testName + "_PR", "ln", 1, 100 });
- vm7.invoke(WANTestBase.class, "createPartitionedRegionAsAccessor", new Object[] {
- testName + "_PR", "ln", 1, 100 });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true);
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
-
- //start the senders
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
pause(2000);
vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 10 });
vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName + "_PR", 1000 });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName + "_PR", 1000 });
+
+ validateRegionSizes(testName + "_PR", 10, vm2, vm3);
}
@@ -159,52 +89,14 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
* @throws Exception
*/
public void testParallelPropagationSenderPause() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
//make sure all the senders are running before doing any puts
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ waitForSendersRunning();
//FIRST RUN: now, the senders are started. So, start the puts
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 100 });
@@ -227,52 +119,14 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
* @throws Exception
*/
public void testParallelPropagationSenderResume() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
//make sure all the senders are running before doing any puts
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ waitForSendersRunning();
//now, the senders are started. So, start the puts
vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });
@@ -293,17 +147,14 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
pause(2000);
-
- vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ validateParallelSenderQueueAllBucketsDrained();
//find the region size on remote vm
vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName + "_PR", 1000 });
}
-
+
/**
* Negative scenario in which a sender that is stopped (and not paused) is resumed.
* Expected: resume is only valid for pause. If a sender which is stopped is resumed,
@@ -312,10 +163,9 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
* @throws Exception
*/
public void testParallelPropagationSenderResumeNegativeScenario() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
@@ -373,61 +223,20 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
*/
public void testParallelPropagationSenderStop() throws Exception {
addExpectedException("Broken pipe");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
//make sure all the senders are running before doing any puts
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ waitForSendersRunning();
//FIRST RUN: now, the senders are started. So, do some of the puts
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 100 });
//now, stop all of the senders
- vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+ stopSenders();
//SECOND RUN: keep one thread doing puts
vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });
@@ -442,61 +251,20 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
*/
public void testParallelPropagationSenderStartAfterStop() throws Exception {
addExpectedException("Broken pipe");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
//make sure all the senders are running before doing any puts
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ waitForSendersRunning();
//FIRST RUN: now, the senders are started. So, do some of the puts
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 200 });
//now, stop all of the senders
- vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+ stopSenders();
pause(2000);
@@ -507,10 +275,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new Object[] {testName + "_PR", 200 });
//start the senders again
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ startSenders();
//Region size on remote site should remain same and below the number of puts done in the FIRST RUN
vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new Object[] {testName + "_PR", 200 });
@@ -522,10 +287,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
pause(2000);
//verify all the buckets on all the sender nodes are drained
- vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ validateParallelSenderQueueAllBucketsDrained();
//verify the events propagate to remote site
vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName + "_PR", 1000 });
@@ -544,52 +306,14 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
*/
public void testParallelPropagationSenderStartAfterStop_Scenario2() throws Exception {
addExpectedException("Broken pipe");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
//make sure all the senders are running before doing any puts
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ waitForSendersRunning();
getLogWriter().info("All the senders are now started");
@@ -599,10 +323,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
getLogWriter().info("Done few puts");
//now, stop all of the senders
- vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+ stopSenders();
getLogWriter().info("All the senders are stopped");
pause(2000);
@@ -632,10 +353,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
pause(2000);
//verify all the buckets on all the sender nodes are drained
- vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ validateParallelSenderQueueAllBucketsDrained();
//verify that the queue size ultimately becomes zero. That means all the events propagate to remote site.
vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln", 0 });
@@ -649,62 +367,20 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
addExpectedException("Broken pipe");
addExpectedException("Connection reset");
addExpectedException("Unexpected IOException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegionAsAccessor", new Object[] {
- testName + "_PR", "ln", 1, 100 });
- vm7.invoke(WANTestBase.class, "createPartitionedRegionAsAccessor", new Object[] {
- testName + "_PR", "ln", 1, 100 });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, true, true);
//make sure all the senders are not running on accessor nodes and running on non-accessor nodes
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
-
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ waitForSendersRunning();
//FIRST RUN: now, the senders are started. So, do some of the puts
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 200 });
//now, stop all of the senders
- vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+ stopSenders();
pause(2000);
@@ -715,10 +391,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new Object[] {testName + "_PR", 200 });
//start the senders again
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ startSenders();
//Region size on remote site should remain same and below the number of puts done in the FIRST RUN
vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new Object[] {testName + "_PR", 200 });
@@ -736,57 +409,19 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName + "_PR", 1000 });
}
-
/**
* Normal scenario in which a combinations of start, pause, resume operations
* is tested
*/
public void testStartPauseResumeParallelGatewaySender() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
- getLogWriter().info("Created cache on local site");
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- getLogWriter().info("Created senders on local site");
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- getLogWriter().info("Created PRs on remote site");
-
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });
- getLogWriter().info("Done 1000 puts on local site");
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });
+ getLogWriter().info("Done 1000 puts on local site");
//Since puts are already done on userPR, it will have the buckets created.
//During sender start, it will wait until those buckets are created for shadowPR as well.
@@ -796,11 +431,8 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
vm5.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
vm6.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
vm7.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+
+ waitForSendersRunning();
getLogWriter().info("Started senders on local site");
@@ -841,999 +473,24 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
}
//verify all buckets drained on all sender nodes.
- vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
- vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 5000 });
- vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 5000 });
- }
-
- // to test that when userPR is locally destroyed, shadow Pr is also locally
- // destroyed and on recreation usrePr , shadow Pr is also recreated.
- public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_RecreateRegion() throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
+ validateParallelSenderQueueAllBucketsDrained();
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 10 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- // since sender is paused, no dispatching
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(WANTestBase.class, "localDestroyRegion",
- new Object[] { testName + "_PR" });
-
- // since shodowPR is locally destroyed, so no data to dispatch
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 10, 20 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
+ validateRegionSizes(testName + "_PR", 5000, vm2, vm3);
}
-
- public void testParallelGatewaySender_SingleNode_UserPR_Destroy_RecreateRegion() throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 10 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- // since resume is paused, no dispatching
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName
- + "_PR" });
-
- // before destoy, there is wait for queue to drain, so data will be
- // dispatched
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 10, 20 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 20 });
-
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
-
- public void testParallelGatewaySender_SingleNode_UserPR_Close_RecreateRegion() throws Exception {
- ExpectedException exp = addExpectedException(RegionDestroyedException.class
- .getName());
- try {
- Integer lnPort = (Integer) vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer) vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- getLogWriter().info("Created cache on local site");
-
- getLogWriter().info("Created senders on local site");
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] {
- testName + "_PR", 10 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- // since resume is paused, no dispatching
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "closeRegion", new Object[] { testName + "_PR" });
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
- pause(500); // paused if there is any element which is received on
- // remote
- // site
-
- // before close, there is wait for queue to drain, so data will be
- // dispatched
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 10, 20 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- } finally {
- exp.remove();
- }
- }
-
- //to test that while localDestroy is in progress, put operation does not successed
- public void testParallelGatewaySender_SingleNode_UserPR_localDestroy_SimultenuousPut_RecreateRegion() throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- getLogWriter().info("Created cache on local site");
-
- getLogWriter().info("Created senders on local site");
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 10 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- // since resume is paused, no dispatching
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class,
- "doPutsFrom", new Object[] { testName + "_PR", 100, 2000 });
- AsyncInvocation localDestroyAsync = vm4.invokeAsync(WANTestBase.class,
- "localDestroyRegion", new Object[] { testName + "_PR" });
- try {
- putAsync.join();
- localDestroyAsync.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail("Interrupted the async invocation.");
- }
-
- if (localDestroyAsync.getException() != null) {
- fail("Not Expected Exception got", putAsync.getException());
- }
-
- if (putAsync.getException() != null
- && !(putAsync.getException() instanceof RegionDestroyedException)) {
- fail("Expected RegionDestroyedException but got",
- putAsync.getException());
- }
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
- pause(500); // paused if there is any element which is received on remote
- // site
-
- // since shodowPR is locally destroyed, so no data to dispatch
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 10, 20 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
- public void testParallelGatewaySender_SingleNode_UserPR_Destroy_SimultenuousPut_RecreateRegion() throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "addCacheListenerAndDestroyRegion", new Object[] {
- testName + "_PR"});
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 10 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- // since resume is paused, no dispatching
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
- AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class,
- "doPutsFrom", new Object[] { testName + "_PR", 10, 101 });
- try {
- putAsync.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail("Interrupted the async invocation.");
- }
-
- if (putAsync.getException() != null
- && !(putAsync.getException() instanceof RegionDestroyedException)) {
- fail("Expected RegionDestroyedException but got",
- putAsync.getException());
- }
-
- // before destroy, there is wait for queue to drain, so data will be
- // dispatched
- vm2.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "validateRegionSizeWithinRange", new Object[] { testName + "_PR", 10,
- 101 }); // possible size is more than 10
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 10, 20 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- vm2.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "validateRegionSizeWithinRange", new Object[] { testName + "_PR", 20,
- 101 });// possible size is more than 20
-
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
- public void testParallelGatewaySender_SingleNode_UserPR_Destroy_NodeDown() throws Exception {
- addExpectedException("RegionDestroyedException");
- addExpectedException("Connection reset");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
- vm6.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 10000 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10000 });
-
- // since resume is paused, no dispatching
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
- pause(200);
- AsyncInvocation localDestroyAsync = vm4.invokeAsync(WANTestBase.class,
- "destroyRegion", new Object[] { testName + "_PR" });
-
- AsyncInvocation closeAsync = vm4.invokeAsync(WANTestBase.class,
- "closeCache");
- try {
- localDestroyAsync.join();
- closeAsync.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail("Interrupted the async invocation.");
- }
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10000 });// possible size is more than 20
-
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- vm6.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
- public void testParallelGatewaySender_SingleNode_UserPR_Close_SimultenuousPut_RecreateRegion() throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- getLogWriter().info("Created cache on local site");
-
- getLogWriter().info("Created senders on local site");
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, false });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 10 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- // since resume is paused, no dispatching
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- AsyncInvocation putAsync = vm4.invokeAsync(WANTestBase.class,
- "doPutsFrom", new Object[] { testName + "_PR", 10, 2000 });
- AsyncInvocation localDestroyAsync = vm4.invokeAsync(
- ParallelGatewaySenderOperationsDUnitTest.class, "closeRegion",
- new Object[] { testName + "_PR" });
- try {
- putAsync.join();
- localDestroyAsync.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail("Interrupted the async invocation.");
- }
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 0 });
-
- vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 10, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 10, 20 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
- public void testParallelGatewaySenders_SingleNode_UserPR_localDestroy_RecreateRegion()
- throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
- Integer tkPort = (Integer)vm2.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 3, lnPort });
- Integer pnPort = (Integer)vm3.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 4, lnPort });
-
- vm4.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm5.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
- vm6.invoke(WANTestBase.class, "createReceiver", new Object[] { pnPort });
-
- try {
- vm7.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- getLogWriter().info("Created cache on local site");
-
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
- true, 100, 10, false, false, null, true });
-
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln2", 3,
- true, 100, 10, false, false, null, true });
-
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln3", 4,
- true, 100, 10, false, false, null, true });
-
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln3" });
-
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln1,ln2,ln3", 1, 10, isOffHeap() });
-
- getLogWriter().info("Created PRs on local site");
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 10, isOffHeap() });
-
- vm7.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 10 });
-
- vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
- vm7.invoke(WANTestBase.class, "validateQueueContents",
- new Object[] { "ln1", 0 });
- vm7.invoke(WANTestBase.class, "validateQueueContents",
- new Object[] { "ln2", 0 });
- vm7.invoke(WANTestBase.class, "validateQueueContents",
- new Object[] { "ln3", 0 });
-
- vm7.invoke(WANTestBase.class, "localDestroyRegion",
- new Object[] { testName + "_PR" });
-
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln1,ln2,ln3", 1, 10, isOffHeap() });
-
- vm7.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 10, 20 });
-
- vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10 });
-
- vm7.invoke(WANTestBase.class, "validateQueueContents",
- new Object[] { "ln1", 0 });
- vm7.invoke(WANTestBase.class, "validateQueueContents",
- new Object[] { "ln2", 0 });
- vm7.invoke(WANTestBase.class, "validateQueueContents",
- new Object[] { "ln3", 0 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 20 });
- vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 20 });
- vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 20 });
- } finally {
- vm7.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
- public void testParallelGatewaySender_MultipleNode_UserPR_localDestroy_Recreate() throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
-
- AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts",
- new Object[] { testName + "_PR", 1000 });
- pause(1000);
- vm5.invoke(WANTestBase.class, "localDestroyRegion",
- new Object[] { testName + "_PR" });
-
- try {
- inv1.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- fail("Interrupted the async invocation.");
- }
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
-
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 1000, 2000 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 2000 });
-
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 2000 });
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
- public void testParallelGatewaySenders_MultiplNode_UserPR_localDestroy_Recreate()
- throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
- Integer tkPort = (Integer)vm2.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 3, lnPort });
-
- vm6.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm7.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
- true, 100, 10, false, false, null, true });
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln2", 3,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln2", 3,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
-
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
-
- AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts",
- new Object[] { testName + "_PR", 1000 });
- pause(1000);
- vm5.invoke(WANTestBase.class, "localDestroyRegion",
- new Object[] { testName + "_PR" });
-
- try {
- inv1.join();
- } catch (InterruptedException ex) {
- ex.printStackTrace();
- fail("Interrupted the async invocation.");
- }
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
-
- vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
- vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
-
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln1,ln2", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "doPutsFrom", new Object[] {
- testName + "_PR", 1000, 2000 });
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 2000 });
-
- vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 2000 });
- vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 2000 });
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
- public void testParallelGatewaySender_ColocatedPartitionedRegions_localDestroy() throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
- "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
- "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
- null, 1, 100, isOffHeap() });
-
- AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class,
- "putcolocatedPartitionedRegion", new Object[] { 2000 });
- pause(1000);
-
- try {
- vm5.invoke(WANTestBase.class, "localDestroyRegion",
- new Object[] { customerRegionName });
- } catch (Exception ex) {
- assertTrue(ex.getCause() instanceof UnsupportedOperationException);
- }
-
- try {
- inv1.join();
- } catch (Exception e) {
- fail("Unexpected exception", e);
- }
-
- vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- customerRegionName, 2000 });
- vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- customerRegionName, 2000 });
- vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- customerRegionName, 2000 });
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
-
- }
-
- public void testParallelGatewaySender_ColocatedPartitionedRegions_destroy() throws Exception {
- addExpectedException("RegionDestroyedException");
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- try {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME",
- new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
- "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
- "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- getLogWriter().info("Created PRs on local site");
-
- vm2.invoke(WANTestBase.class,
- "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
- null, 1, 100, isOffHeap() });
-
- AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class,
- "putcolocatedPartitionedRegion", new Object[] { 2000 });
- pause(1000);
-
- try {
- vm5.invoke(WANTestBase.class, "destroyRegion",
- new Object[] { customerRegionName });
- } catch (Exception ex) {
- assertTrue(ex.getCause() instanceof IllegalStateException);
- return;
- }
- fail("Excpeted UnsupportedOperationException");
- } finally {
- vm4.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- vm5.invoke(ParallelGatewaySenderOperationsDUnitTest.class,
- "clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME");
- }
- }
-
/**
* Since the sender is attached to a region and in use, it can not be
* destroyed. Hence, exception is thrown by the sender API.
*/
public void testDestroyParallelGatewaySenderExceptionScenario() {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
- 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
- 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
- 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
- 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
// make sure all the senders are running before doing any puts
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState",
- new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState",
- new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState",
- new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState",
- new Object[] { "ln" });
+ waitForSendersRunning();
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
1000 });
@@ -1859,56 +516,14 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
}
public void testDestroyParallelGatewaySender() {
- Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
- "createFirstLocatorWithDSId", new Object[] { 1 });
- Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
- "createFirstRemoteLocator", new Object[] { 2, lnPort });
+ Integer[] locatorPorts = createLNAndNYLocators();
+ Integer lnPort = locatorPorts[0];
+ Integer nyPort = locatorPorts[1];
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
- 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
- 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
- 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
- 100, 10, false, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 1, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 1, 100, isOffHeap() });
+ createSendersReceiversAndPartitionedRegion(lnPort, nyPort, false, true);
// make sure all the senders are running
- vm4.invoke(WANTestBase.class, "waitForSenderRunningState",
- new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "waitForSenderRunningState",
- new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "waitForSenderRunningState",
- new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "waitForSenderRunningState",
- new Object[] { "ln" });
+ waitForSendersRunning();
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
1000 });
@@ -1916,10 +531,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
pause(2000);
//stop the sender and remove from region before calling destroy on it
- vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+ stopSenders();
vm4.invoke(WANTestBase.class, "removeSenderFromTheRegion", new Object[] {
"ln", testName + "_PR" });
@@ -1945,40 +557,80 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
new Object[] { "ln", true });
}
-
- public static void createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(
- Integer locPort) {
- createCache(false, locPort);
- AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = -1;
+ private void createSendersReceiversAndPartitionedRegion(Integer lnPort, Integer nyPort, boolean createAccessors,
+ boolean startSenders) {
+ // Note: This is a test-specific method used by several test to create
+ // receivers, senders and partitioned regions.
+ createSendersAndReceivers(lnPort, nyPort);
+
+ createPartitionedRegions(createAccessors);
+
+ if (startSenders) {
+ startSenders();
+ }
}
- public static void clear_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME() {
- AbstractGatewaySender.MAXIMUM_SHUTDOWN_WAIT_TIME = 0;
+ private void createSendersAndReceivers(Integer lnPort, Integer nyPort) {
+ // Note: This is a test-specific method used by several test to create
+ // receivers and senders.
+ vm2.invoke(() -> createReceiver(nyPort));
+ vm3.invoke(() -> createReceiver(nyPort));
+
+ vm4.invoke(() -> createCache(lnPort));
+ vm5.invoke(() -> createCache(lnPort));
+ vm6.invoke(() -> createCache(lnPort));
+ vm7.invoke(() -> createCache(lnPort));
+
+ vm4.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
+ vm5.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
+ vm6.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
+ vm7.invoke(() -> createSender("ln", 2, true, 100, 10, false, false, null, true));
}
-
- public static void closeRegion(String regionName) {
- Region r = cache.getRegion(Region.SEPARATOR + regio
<TRUNCATED>