You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/04/01 19:32:11 UTC
[3/7] incubator-geode git commit: GEODE-1062: Refactor of WANTestBase
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index fd1f0ee..8139dca 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -86,13 +86,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, true, null, true ));
@@ -112,10 +109,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -131,15 +125,6 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
}
- protected SerializableCallableIF<Integer> createReceiverRunnable(
- Integer nyPort) {
- return () -> WANTestBase.createReceiver( nyPort );
- }
-
- protected SerializableRunnableIF createCacheRunnable(Integer lnPort) {
- return () -> WANTestBase.createCache( lnPort );
- }
-
/**
* Enable persistence for the GatewaySender but not the region
*/
@@ -147,15 +132,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
LogWriterUtils.getLogWriter().info("Created remote receivers");
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created local site cache");
@@ -176,13 +157,9 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
vm7.invoke(createPartitionedRegionRunnable());
LogWriterUtils.getLogWriter().info("Created local site persistent PR");
-
- vm4.invoke(startSenderRunnable());
- LogWriterUtils.getLogWriter().info("Started sender on vm4");
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Started the senders");
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
@@ -203,11 +180,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), "ln", 1, 100, isOffHeap() );
}
- protected SerializableRunnableIF startSenderRunnable() {
- return () -> WANTestBase.startSender( "ln" );
- }
-
-
+
+
/**
* Enable persistence for GatewaySender.
@@ -223,14 +197,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
//create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -253,10 +224,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
vm7.invoke(createPartitionedRegionRunnable());
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
@@ -284,11 +252,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
//create senders with disk store
@@ -318,10 +283,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//start the senders in async mode. This will ensure that the
//node of shadow PR that went down last will come up first
- vm4.invokeAsync(startSenderRunnable());
- vm5.invokeAsync(startSenderRunnable());
- vm6.invokeAsync(startSenderRunnable());
- vm7.invokeAsync(startSenderRunnable());
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
//wait for senders running
@@ -366,14 +328,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
//create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -400,11 +359,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -431,11 +387,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
//create senders with disk store
@@ -469,11 +422,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//start the senders in async mode. This will ensure that the
//node of shadow PR that went down last will come up first
- vm4.invokeAsync(startSenderRunnable());
- vm5.invokeAsync(startSenderRunnable());
- vm6.invokeAsync(startSenderRunnable());
- vm7.invokeAsync(startSenderRunnable());
-
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
//wait for senders running
vm4.invoke(waitForSenderRunnable());
@@ -505,10 +455,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
@@ -529,10 +476,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
@@ -560,11 +504,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
//create senders with disk store
@@ -596,18 +537,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
}
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
-
- vm4.invoke(() -> WANTestBase.unsetRemoveFromQueueOnException( "ln" ));
- vm5.invoke(() -> WANTestBase.unsetRemoveFromQueueOnException( "ln" ));
- vm6.invoke(() -> WANTestBase.unsetRemoveFromQueueOnException( "ln" ));
- vm7.invoke(() -> WANTestBase.unsetRemoveFromQueueOnException( "ln" ));
//start the senders in async mode. This will ensure that the
//node of shadow PR that went down last will come up first
- vm4.invokeAsync(startSenderRunnable());
- vm5.invokeAsync(startSenderRunnable());
- vm6.invokeAsync(startSenderRunnable());
- vm7.invokeAsync(startSenderRunnable());
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
//wait for senders running
@@ -619,8 +552,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Creating the receiver.");
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
//create PR on remote site
LogWriterUtils.getLogWriter().info("Creating the partitioned region at receiver. ");
@@ -628,6 +560,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
getTestMethodName(), null, 1, 100, isOffHeap() ));
+ createReceiverInVMs(nyPort, vm2, vm3);
+
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
vm6.invoke(pauseSenderRunnable());
@@ -665,14 +599,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
- //create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -699,11 +629,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -730,11 +657,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
//create senders with disk store
@@ -769,10 +693,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//start the senders in async mode. This will ensure that the
//node of shadow PR that went down last will come up first
- vm4.invokeAsync(startSenderRunnable());
- vm5.invokeAsync(startSenderRunnable());
- vm6.invokeAsync(startSenderRunnable());
- vm7.invokeAsync(startSenderRunnable());
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
//wait for senders running
@@ -813,14 +734,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
//create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -847,10 +765,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
@@ -878,11 +793,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
//create senders with disk store
@@ -917,11 +829,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//start the senders in async mode. This will ensure that the
//node of shadow PR that went down last will come up first
- vm4.invokeAsync(startSenderRunnable());
- vm5.invokeAsync(startSenderRunnable());
- vm6.invokeAsync(startSenderRunnable());
- vm7.invokeAsync(startSenderRunnable());
-
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
//wait for senders running
vm4.invoke(waitForSenderRunnable());
@@ -953,14 +862,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
- //create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -987,11 +892,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -1018,11 +920,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
//create senders with disk store
@@ -1043,11 +942,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//start the senders in async mode. This will ensure that the
//node of shadow PR that went down last will come up first
- vm4.invokeAsync(startSenderRunnable());
- vm5.invokeAsync(startSenderRunnable());
- vm6.invokeAsync(startSenderRunnable());
- vm7.invokeAsync(startSenderRunnable());
-
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
//wait for senders running
vm4.invoke(waitForSenderRunnable());
@@ -1089,10 +985,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
// create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
// create PR on local site
vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
@@ -1116,10 +1009,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
// restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
LogWriterUtils.getLogWriter().info("Created back the cache");
@@ -1182,14 +1072,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
//create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -1216,11 +1103,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -1247,11 +1131,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
@@ -1265,11 +1146,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//start the senders. NOTE that the senders are not associated with partitioned region
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Started the senders.");
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
@@ -1302,14 +1180,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
//create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//create senders with disk store
String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -1332,11 +1207,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
vm7.invoke(createPartitionedRegionRunnable());
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -1362,11 +1234,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders. The local site has been brought down.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
//create senders with disk store
@@ -1386,11 +1255,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
//start the senders
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Started the senders.");
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
@@ -1426,14 +1292,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
//create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -1461,11 +1324,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
//start the senders on local site
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
//wait for senders to become running
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -1492,11 +1352,8 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Killed all the senders.");
//restart the vm
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
-
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Created back the cache");
//create back the senders
@@ -1511,17 +1368,9 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
LogWriterUtils.getLogWriter().info("Created the senders again");
- vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-
//start the senders
- vm4.invokeAsync(startSenderRunnable());
- vm5.invokeAsync(startSenderRunnable());
- vm6.invokeAsync(startSenderRunnable());
- vm7.invokeAsync(startSenderRunnable());
-
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
LogWriterUtils.getLogWriter().info("Started the senders.");
LogWriterUtils.getLogWriter().info("Waiting for senders running.");
@@ -1581,14 +1430,11 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on remote site
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
//create cache in local site
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, true, null, true ));
@@ -1612,12 +1458,9 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
//start puts in region on local site
vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
LogWriterUtils.getLogWriter().info("Completed puts in the region");
@@ -1640,13 +1483,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, true, null, true ));
@@ -1666,10 +1506,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -1694,10 +1531,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
getTestMethodName(), null, 1, 100, isOffHeap() ));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
index 594514b..52169ba 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
@@ -63,12 +63,9 @@ public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.createClientWithLocator(
lnPort, "localhost", getTestMethodName() + "_PR" ));
-
- AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
- AsyncInvocation inv2 = vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
- inv1.join();
- inv2.join();
+ startSenderInVMsAsync("ln", vm5, vm6);
+
// before doing any puts, let the senders be running in order to ensure that
// not a single event will be lost
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
index 8ae5d0b..2c0d693 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
@@ -41,11 +41,10 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -56,10 +55,9 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-
+
+ startSenderInVMs("ln", vm4, vm5);
+
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 1, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
@@ -107,11 +105,10 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -122,10 +119,9 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
vm5.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-
+
+ startSenderInVMs("ln", vm4, vm5);
+
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 10, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
@@ -154,17 +150,15 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 3, 4, isOffHeap() ));
vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
-
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
- vm6.invoke(() -> WANTestBase.createCache( lnPort ));
- vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -183,11 +177,8 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
vm7.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
- vm6.invoke(() -> WANTestBase.startSender( "ln" ));
- vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.addQueueListener( "ln", true));
vm5.invoke(() -> WANTestBase.addQueueListener( "ln", true));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
index d8d2585..6d4b03a 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -82,11 +82,8 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
public void testParallelPropagation_withoutRemoteSite() throws Exception {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
//keep a larger batch to minimize number of exception occurrences in the log
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -103,15 +100,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm6.invoke(createPartitionedRegionRedundancy1Runnable());
vm7.invoke(createPartitionedRegionRedundancy1Runnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false ));
- vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false));
- vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false ));
- vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", false ));
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
//make sure all the senders are running before doing any puts
vm4.invoke(waitForSenderRunnable());
@@ -122,12 +111,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
1000 ));
-
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
+ createCacheInVMs(nyPort, vm2, vm3);
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+ createReceiverInVMs(nyPort, vm2, vm3);
//verify all buckets drained on all sender nodes.
vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
@@ -142,10 +129,6 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.validateRegionSize(
getTestMethodName() + "_PR", 1000 ));
}
-
- protected SerializableRunnableIF createCacheRunnable(Integer lnPort) {
- return () -> WANTestBase.createCache( lnPort );
- }
/**
* Normal happy scenario test case.
@@ -155,13 +138,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -177,10 +156,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm6.invoke(createPartitionedRegionRedundancy1Runnable());
vm7.invoke(createPartitionedRegionRedundancy1Runnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -215,10 +191,6 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() );
}
- protected SerializableRunnableIF startSenderRunnable() {
- return () -> WANTestBase.startSender( "ln" );
- }
-
protected SerializableRunnableIF waitForSenderRunnable() {
return () -> WANTestBase.waitForSenderRunningState( "ln" );
}
@@ -227,13 +199,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, false ));
@@ -280,13 +248,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -304,21 +268,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
1000 ));
-
- AsyncInvocation inv1 = vm4.invokeAsync(startSenderRunnable());
- AsyncInvocation inv2 = vm5.invokeAsync(startSenderRunnable());
- AsyncInvocation inv3 = vm6.invokeAsync(startSenderRunnable());
- AsyncInvocation inv4 = vm7.invokeAsync(startSenderRunnable());
- try{
- inv1.join();
- inv2.join();
- inv3.join();
- inv4.join();
- }
- catch(InterruptedException ie) {
- fail("Caught interrupted exception");
- }
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -349,13 +301,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -371,10 +319,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm6.invoke(createPartitionedRegionRedundancy1Runnable());
vm7.invoke(createPartitionedRegionRedundancy1Runnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -400,11 +345,8 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer regionSize =
(Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" ));
LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize);
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -415,20 +357,12 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
- vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
-
vm4.invoke(createPartitionedRegionRedundancy1Runnable());
vm5.invoke(createPartitionedRegionRedundancy1Runnable());
vm6.invoke(createPartitionedRegionRedundancy1Runnable());
vm7.invoke(createPartitionedRegionRedundancy1Runnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -458,13 +392,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -480,10 +410,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -512,13 +439,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -534,10 +457,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -565,13 +485,13 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -591,15 +511,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
//let all the senders start before doing any puts to ensure that none of the events is lost
vm4.invoke(waitForSenderRunnable());
@@ -625,13 +537,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "lnSerial",
2, false, 100, 10, false, false, null, true ));
@@ -673,13 +581,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
- vm4.invoke(() -> WANTestBase.startSender( "lnSerial" ));
- vm5.invoke(() -> WANTestBase.startSender( "lnSerial" ));
+ startSenderInVMs("lnSerial", vm4, vm5);
- vm4.invoke(() -> WANTestBase.startSender( "lnParallel" ));
- vm5.invoke(() -> WANTestBase.startSender( "lnParallel" ));
- vm6.invoke(() -> WANTestBase.startSender( "lnParallel" ));
- vm7.invoke(() -> WANTestBase.startSender( "lnParallel" ));
+ startSenderInVMs("lnParallel", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
1000 ));
@@ -704,13 +608,15 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(tkPort));
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ createReceiverInVMs(nyPort, vm2);
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(tkPort, vm3);
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+ createReceiverInVMs(tkPort, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "lnParallel1",
2, true, 100, 10, false, false, null, true ));
@@ -739,18 +645,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
- vm4.invoke(() -> WANTestBase.startSender( "lnParallel1" ));
- vm5.invoke(() -> WANTestBase.startSender( "lnParallel1" ));
- vm6.invoke(() -> WANTestBase.startSender( "lnParallel1" ));
- vm7.invoke(() -> WANTestBase.startSender( "lnParallel1" ));
+ startSenderInVMs("lnParallel1", vm4, vm5, vm6, vm7);
- vm4.invoke(() -> WANTestBase.startSender( "lnParallel2" ));
- vm5.invoke(() -> WANTestBase.startSender( "lnParallel2" ));
- vm6.invoke(() -> WANTestBase.startSender( "lnParallel2" ));
- vm7.invoke(() -> WANTestBase.startSender( "lnParallel2" ));
+ startSenderInVMs("lnParallel2", vm4, vm5, vm6, vm7);
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
//before doing puts, make sure that the senders are started.
//this will ensure that not a single events is lost
@@ -793,13 +691,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -810,10 +704,6 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
- vm4.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm5.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm6.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
- vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
@@ -824,11 +714,8 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -862,13 +749,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false,
@@ -892,10 +775,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm7.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -921,13 +801,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -943,10 +819,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm6.invoke(createPartitionedRegionRedundancy1Runnable());
vm7.invoke(createPartitionedRegionRedundancy1Runnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -983,13 +856,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
- vm6.invoke(createCacheRunnable(lnPort));
- vm7.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 100, false, false, null, true ));
@@ -1005,10 +874,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm6.invoke(createPartitionedRegionRedundancy1Runnable());
vm7.invoke(createPartitionedRegionRedundancy1Runnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
- vm6.invoke(startSenderRunnable());
- vm7.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
@@ -1064,11 +930,9 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(createReceiverRunnable(nyPort));
- vm3.invoke(createReceiverRunnable(nyPort));
-
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5);
//vm6.invoke(() -> WANTestBase.createCache( lnPort ));
//vm7.invoke(() -> WANTestBase.createCache( lnPort ));
@@ -1088,8 +952,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
// vm7.invoke(() -> WANTestBase.createPartitionedRegion(
// testName + "_PR", "ln", true, 1, 100, isOffHeap() ));
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5);
// vm6.invoke(() -> WANTestBase.startSender( "ln" ));
// vm7.invoke(() -> WANTestBase.startSender( "ln" ));
@@ -1118,11 +981,11 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
public void disable_testParallelGatewaySenderQueueLocalSize() {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm2.invoke(createReceiverRunnable(nyPort));
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2);
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
+ createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -1132,8 +995,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm4.invoke(createPartitionedRegionRedundancy1Runnable());
vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5);
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -1171,11 +1033,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
IgnoredException.addIgnoredException("Unexpected IOException");
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm2.invoke(createReceiverRunnable(nyPort));
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
+ createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -1185,8 +1046,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm4.invoke(createPartitionedRegionRedundancy1Runnable());
vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm4, vm5);
vm4.invoke(waitForSenderRunnable());
vm5.invoke(waitForSenderRunnable());
@@ -1225,22 +1085,22 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
* Added for defect #50364 Can't colocate region that has AEQ with a region that does not have that same AEQ
*/
public void testParallelSenderAttachedToChildRegionButNotToParentRegion() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- //create cache and receiver on site2
- vm2.invoke(createReceiverRunnable(nyPort));
-
- //create cache on site1
- vm3.invoke(createCacheRunnable(lnPort));
-
- //create sender on site1
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create cache and receiver on site2
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
+ //create cache on site1
+ createCacheInVMs(lnPort, vm3);
+
+ //create sender on site1
vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
//start sender on site1
- vm3.invoke(startSenderRunnable());
-
+ startSenderInVMs("ln", vm3);
+
//create leader (parent) PR on site1
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() ));
@@ -1269,13 +1129,10 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm6.invoke(createReceiverRunnable(nyPort));
- vm7.invoke(createReceiverRunnable(nyPort));
+ createCacheInVMs(nyPort, vm6, vm7);
+ createReceiverInVMs(nyPort, vm6, vm7);
- vm2.invoke(createCacheRunnable(lnPort));
- vm3.invoke(createCacheRunnable(lnPort));
- vm4.invoke(createCacheRunnable(lnPort));
- vm5.invoke(createCacheRunnable(lnPort));
+ createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
vm2.invoke(() -> WANTestBase.createSender( "ln", 2, true,
100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
@@ -1295,10 +1152,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm5.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm2.invoke(startSenderRunnable());
- vm3.invoke(startSenderRunnable());
- vm4.invoke(startSenderRunnable());
- vm5.invoke(startSenderRunnable());
+ startSenderInVMs("ln", vm2, vm3, vm4, vm5);
vm6.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -1347,8 +1201,4 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
}
- protected SerializableCallableIF<Integer> createReceiverRunnable(
- Integer nyPort) {
- return () -> WANTestBase.createReceiver( nyPort );
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
index b921c88..7adba41 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
@@ -42,15 +42,11 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//create receiver on site1 and site2
+ createCacheInVMs(lnPort, vm2, vm4, vm5);
vm2.invoke(() -> WANTestBase.createReceiver( lnPort ));
+ createCacheInVMs(nyPort, vm3, vm6, vm7);
vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
- //create cache on site1 and site2
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
- vm6.invoke(() -> WANTestBase.createCache( nyPort ));
- vm7.invoke(() -> WANTestBase.createCache( nyPort ));
-
//create senders on site1
vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -84,15 +80,13 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
//start sender on site1
- vm2.invoke(() -> WANTestBase.startSender( "ln" ));
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-
+ startSenderInVMs("ln", vm2, vm4, vm5);
+
+
//start sender on site2
- vm3.invoke(() -> WANTestBase.startSender( "ny" ));
- vm6.invoke(() -> WANTestBase.startSender( "ny" ));
- vm7.invoke(() -> WANTestBase.startSender( "ny" ));
-
+ startSenderInVMs("ny", vm3, vm6, vm7);
+
+
//pause senders on site1
vm2.invoke(() -> WANTestBase.pauseSender( "ln" ));
vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
@@ -162,13 +156,14 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
//create cache and receivers on all the 3 sites
- vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
- vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
-
- vm6.invoke(() -> WANTestBase.createCache( lnPort ));
- vm7.invoke(() -> WANTestBase.createCache( nyPort ));
-
+ createCacheInVMs(lnPort, vm3, vm6);
+ createReceiverInVMs(lnPort, vm3, vm6);
+ createCacheInVMs(nyPort, vm4, vm7);
+ createReceiverInVMs(nyPort, vm4, vm7);
+ createCacheInVMs(tkPort, vm5);
+ createReceiverInVMs(tkPort, vm5);
+
+
//create senders on all the 3 sites
vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false, null, true ));
@@ -198,12 +193,10 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
getTestMethodName() + "_PR", "tk", 0, 100, isOffHeap() ));
//start senders on all the sites
- vm3.invoke(() -> WANTestBase.startSender( "ln" ));
- vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ny" ));
- vm7.invoke(() -> WANTestBase.startSender( "ny" ));
-
+ startSenderInVMs("ln", vm3, vm6);
+
+ startSenderInVMs("ny", vm4, vm7);
+
vm5.invoke(() -> WANTestBase.startSender( "tk" ));
//pause senders on site1 and site3. Site2 has the sender running to pass along events
@@ -265,14 +258,14 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
-
+
+ createCacheInVMs(lnPort, vm3, vm6);
+ createCacheInVMs(nyPort, vm4, vm7);
+ createCacheInVMs(tkPort, vm5);
vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
-
- vm6.invoke(() -> WANTestBase.createCache( lnPort ));
- vm7.invoke(() -> WANTestBase.createCache( nyPort ));
-
+
//site1
vm3.invoke(() -> WANTestBase.createSender( "ln1", 2,
true, 100, 10, false, false, null, true ));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 55fd5ad..9d9c074 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -47,8 +47,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createReceiver(vm2, nyPort);
- createReceiver(vm3, nyPort);
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
createSendersWithConflation(lnPort);
@@ -79,16 +79,16 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createReceiver(vm2, nyPort);
-
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
createSenders(lnPort);
createReceiverPR(vm2, 0);
createSenderPRs(0);
-
- startSenders();
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.doPuts( testName,
NUM_PUTS ));
@@ -116,15 +116,16 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createReceiver(vm2, nyPort);
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
createSenders(lnPort);
createReceiverPR(vm2, 0);
createSenderPRs(3);
-
- startSenders();
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.doPuts( testName,
NUM_PUTS ));
@@ -152,8 +153,10 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
- createReceiver(vm2, nyPort);
- createReceiver(vm3, tkPort);
+ createCacheInVMs(nyPort, vm2);
+ createCacheInVMs(tkPort, vm3);
+ createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(tkPort, vm3);
vm4.invoke(() -> WANTestBase.createCache(lnPort ));
@@ -207,16 +210,16 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createReceiver(vm2, nyPort);
-
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
createSenders(lnPort);
createReceiverPR(vm2, 0);
createSenderPRs(3);
-
- startSenders();
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName, 1000 ));
pause(200);
@@ -256,7 +259,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createReceiver(vm2, nyPort);
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
createSenders(lnPort);
@@ -267,7 +271,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
createSenderPRs(0);
- startSenders();
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
//start puts in RR_1 in another thread
vm4.invoke(() -> WANTestBase.doPuts( testName, 2000 ));
@@ -291,12 +295,10 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
- createReceiver(vm2, nyPort);
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
- vm4.invoke(() -> WANTestBase.createCache(lnPort ));
- vm5.invoke(() -> WANTestBase.createCache(lnPort ));
- vm6.invoke(() -> WANTestBase.createCache(lnPort ));
- vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, false, false,
@@ -313,7 +315,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
createSenderPRs(0);
- startSenders();
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
createReceiverPR(vm2, 1);
@@ -342,7 +344,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createReceiver(vm2, nyPort);
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(nyPort, vm2);
createSendersWithConflation(lnPort);
@@ -433,7 +436,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
}
protected void startPausedSenders() {
- startSenders();
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
vm4.invoke(() ->pauseSender( "ln" ));
vm5.invoke(() ->pauseSender( "ln" ));
@@ -441,22 +444,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
vm7.invoke(() ->pauseSender( "ln" ));
}
- protected void createReceiver(VM vm, Integer nyPort) {
- vm.invoke(() -> WANTestBase.createReceiver( nyPort ));
- }
-
- protected void startSenders() {
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
- vm6.invoke(() -> WANTestBase.startSender( "ln" ));
- vm7.invoke(() -> WANTestBase.startSender( "ln" ));
- }
-
protected void createSendersWithConflation(Integer lnPort) {
- vm4.invoke(() -> WANTestBase.createCache(lnPort ));
- vm5.invoke(() -> WANTestBase.createCache(lnPort ));
- vm6.invoke(() -> WANTestBase.createCache(lnPort ));
- vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
true, 100, 10, true, false, null, true ));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
index 7a05644..7f69904 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
@@ -58,8 +58,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
vm5.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
false, 100, 10, false, false, null, false, true));
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+ startSenderInVMs("ln", vm4, vm5);
vm4.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", "ln", isOffHeap() ));
@@ -98,13 +97,11 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm4.invoke(() -> WANTestBase.createCache(lnPort ));
- vm5.invoke(() -> WANTestBase.createCache(lnPort ));
- vm6.invoke(() -> WANTestBase.createCache(lnPort ));
- vm7.invoke(() -> WANTestBase.createCache(lnPort ));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
false, 100, 10, false, false, null, false, true));
@@ -116,8 +113,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+ startSenderInVMs("ln", vm4, vm5);
vm4.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", "ln", isOffHeap() ));
@@ -158,13 +154,10 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
- vm6.invoke(() -> WANTestBase.createCache( lnPort ));
- vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
false, 100, 10, false, false, null, true, true));
@@ -176,8 +169,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+ startSenderInVMs("ln", vm4, vm5);
vm4.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", "ln", isOffHeap() ));
@@ -213,10 +205,10 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort ));
- vm2.invoke(() -> WANTestBase.createReceiver(nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver(nyPort ));
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(nyPort, vm2, vm3);
- vm4.invoke(() -> WANTestBase.createCache(lnPort ));
+ createCacheInVMs(lnPort, vm4);
vm4.invoke(() -> WANTestBase.createSenderWithListener( "ln", 2,
false, 100, 10, false, false, null, false, false ));