You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/02/19 19:37:34 UTC

[57/63] [abbrv] incubator-geode git commit: Extracting common lambdas from methods to avoid "Code to large" error

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0804a139/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index e52486f..a29a210 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -29,6 +29,8 @@ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 import com.gemstone.gemfire.test.dunit.AsyncInvocation;
 import com.gemstone.gemfire.test.dunit.IgnoredException;
 import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallableIF;
+import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
 
 public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     WANTestBase {
@@ -84,13 +86,13 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, true, null, true ));
@@ -110,10 +112,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
 
     vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -129,6 +131,15 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
 
   }
 
+  protected SerializableCallableIF<Integer> createReceiverRunnable(
+      Integer nyPort) {
+    return () -> WANTestBase.createReceiver( nyPort );
+  }
+
+  protected SerializableRunnableIF createCacheRunnable(Integer lnPort) {
+    return () -> WANTestBase.createCache( lnPort );
+  }
+
   /**
    * Enable persistence for the GatewaySender but not the region
    */
@@ -136,15 +147,15 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
     
     LogWriterUtils.getLogWriter().info("Created remote receivers");
     
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created local site cache");
 
@@ -159,22 +170,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
   
     LogWriterUtils.getLogWriter().info("Created local site senders");
 
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
 
     LogWriterUtils.getLogWriter().info("Created local site persistent PR");
     
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
     LogWriterUtils.getLogWriter().info("Started sender on vm4");
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Started the senders");
 
@@ -190,6 +197,15 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm3.invoke(() -> WANTestBase.validateRegionSize(
       getTestMethodName(), 1000 ));
   }
+
+  protected SerializableRunnableIF createPartitionedRegionRunnable() {
+    return () -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() );
+  }
+
+  protected SerializableRunnableIF startSenderRunnable() {
+    return () -> WANTestBase.startSender( "ln" );
+  }
   
   
   
@@ -207,14 +223,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -231,32 +247,28 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), null, 1, 100, isOffHeap() ));
     
     //create PR on local site
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
 
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
@@ -264,18 +276,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //--------------------close and rebuild local site -------------------------------------------------
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -287,14 +299,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
     //create PR on local site
-    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv1 = vm4.invokeAsync(createPartitionedRegionRunnable());
+    AsyncInvocation inv2 = vm5.invokeAsync(createPartitionedRegionRunnable());
+    AsyncInvocation inv3 = vm6.invokeAsync(createPartitionedRegionRunnable());
+    AsyncInvocation inv4 = vm7.invokeAsync(createPartitionedRegionRunnable());
     
     try {
       inv1.join();
@@ -310,17 +318,17 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm7.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    vm4.invokeAsync(startSenderRunnable());
+    vm5.invokeAsync(startSenderRunnable());
+    vm6.invokeAsync(startSenderRunnable());
+    vm7.invokeAsync(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
     
@@ -331,6 +339,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm3.invoke(() -> WANTestBase.validateRegionSize(
       getTestMethodName(), 3000 ));
   }
+
+  protected SerializableRunnableIF killSenderRunnable() {
+    return () -> WANTestBase.killSender();
+  }
+
+  protected SerializableRunnableIF pauseSenderRunnable() {
+    return () -> WANTestBase.pauseSender( "ln" );
+  }
+
+  protected SerializableRunnableIF waitForSenderRunnable() {
+    return () -> WANTestBase.waitForSenderRunningState( "ln" );
+  }
   
   /**
    * Enable persistence for PR and GatewaySender.
@@ -346,14 +366,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -380,22 +400,22 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
@@ -403,18 +423,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //--------------------close and rebuild local site -------------------------------------------------
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -449,17 +469,17 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm7.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    vm4.invokeAsync(startSenderRunnable());
+    vm5.invokeAsync(startSenderRunnable());
+    vm6.invokeAsync(startSenderRunnable());
+    vm7.invokeAsync(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
     
@@ -485,10 +505,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
@@ -509,22 +529,22 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 300 ));
@@ -532,18 +552,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //--------------------close and rebuild local site -------------------------------------------------
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -584,23 +604,23 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
 
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm7.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    vm4.invokeAsync(startSenderRunnable());
+    vm5.invokeAsync(startSenderRunnable());
+    vm6.invokeAsync(startSenderRunnable());
+    vm7.invokeAsync(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
 
 
     LogWriterUtils.getLogWriter().info("Creating the receiver.");
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
     //create PR on remote site
     
     LogWriterUtils.getLogWriter().info("Creating the partitioned region at receiver. ");
@@ -608,10 +628,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), null, 1, 100, isOffHeap() ));
     vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
       getTestMethodName(), null, 1, 100, isOffHeap() ));
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Doing some extra puts. ");
     //start puts in region on local site
@@ -645,14 +665,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -679,22 +699,22 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
@@ -702,18 +722,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //--------------------close and rebuild local site -------------------------------------------------
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -749,17 +769,17 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm7.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    vm4.invokeAsync(startSenderRunnable());
+    vm5.invokeAsync(startSenderRunnable());
+    vm6.invokeAsync(startSenderRunnable());
+    vm7.invokeAsync(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
     
@@ -793,14 +813,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -827,22 +847,22 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString( getTestMethodName(), 1000 ));
@@ -850,18 +870,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //--------------------close and rebuild local site -------------------------------------------------
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -897,17 +917,17 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm7.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    vm4.invokeAsync(startSenderRunnable());
+    vm5.invokeAsync(startSenderRunnable());
+    vm6.invokeAsync(startSenderRunnable());
+    vm7.invokeAsync(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
     
@@ -933,14 +953,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -967,22 +987,22 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString( getTestMethodName(), 1000 ));
@@ -990,18 +1010,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //--------------------close and rebuild local site -------------------------------------------------
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -1023,17 +1043,17 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //start the senders in async mode. This will ensure that the 
     //node of shadow PR that went down last will come up first
-    vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm7.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    vm4.invokeAsync(startSenderRunnable());
+    vm5.invokeAsync(startSenderRunnable());
+    vm6.invokeAsync(startSenderRunnable());
+    vm7.invokeAsync(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
     
@@ -1069,10 +1089,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
 
     // create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     // create PR on local site
     vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
@@ -1088,18 +1108,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     // --------------------close and rebuild local site
     // -------------------------------------------------
     // kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
 
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
 
     // restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     LogWriterUtils.getLogWriter().info("Created back the cache");
 
@@ -1162,14 +1182,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -1196,22 +1216,22 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
@@ -1219,18 +1239,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //--------------------close and rebuild local site -------------------------------------------------
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -1245,19 +1265,19 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     
     //start the senders. NOTE that the senders are not associated with partitioned region
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Started the senders.");
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
     
@@ -1282,14 +1302,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     //create senders with disk store
     String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
@@ -1306,50 +1326,46 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), null, 1, 100, isOffHeap() ));
     
     //create non persistent PR on local site
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
 
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
     LogWriterUtils.getLogWriter().info("Completed puts in the region");
     
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders. The local site has been brought down.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -1362,31 +1378,27 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
     
     //create PR on local site
-    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
     
     LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
     
     //start the senders
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Started the senders.");
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
     
@@ -1414,14 +1426,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
 
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
       true, 100, 10, false, false, null, true ));
@@ -1449,22 +1461,22 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       getTestMethodName(), "ln", 1, 100, isOffHeap() ));
     
     //start the senders on local site
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //wait for senders to become running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     //pause the senders
-    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
@@ -1472,18 +1484,18 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     
     //----------------- Close and rebuild local site -------------------------------------
     //kill the senders
-    vm4.invoke(() -> WANTestBase.killSender());
-    vm5.invoke(() -> WANTestBase.killSender());
-    vm6.invoke(() -> WANTestBase.killSender());
-    vm7.invoke(() -> WANTestBase.killSender());
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Killed all the senders.");
     
     //restart the vm
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     LogWriterUtils.getLogWriter().info("Created back the cache");
     
@@ -1505,20 +1517,20 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm7.invoke(() -> WANTestBase.setRemoveFromQueueOnException( "ln", true ));
     
     //start the senders
-    vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm5.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm6.invokeAsync(() -> WANTestBase.startSender( "ln" ));
-    vm7.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    vm4.invokeAsync(startSenderRunnable());
+    vm5.invokeAsync(startSenderRunnable());
+    vm6.invokeAsync(startSenderRunnable());
+    vm7.invokeAsync(startSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("Started the senders.");
     
     LogWriterUtils.getLogWriter().info("Waiting for senders running.");
 
     //wait for senders running
-    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
     
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
     
@@ -1569,14 +1581,14 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
     //create receiver on remote site
-    vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-    vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+    vm2.invoke(createReceiverRunnable(nyPort));
+    vm3.invoke(createReceiverRunnable(nyPort));
 
     //create cache in local site
-    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(createCacheRunnable(lnPort));
+    vm5.invoke(createCacheRunnable(lnPort));
+    vm6.invoke(createCacheRunnable(lnPort));
+    vm7.invoke(createCacheRunnable(lnPort));
     
     vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
       true, 100, 10, false, true, null, true ));
@@ -1601,10 +1613,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
     vm3.invoke(() -> WANTestBase.createPartitionedRegion(
       getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
     
-    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm4.invoke(startSenderRunnable());
+    vm5.invoke(startSenderRunnable());
+    vm6.invoke(startSenderRunnable());
+    vm7.invoke(startSenderRunnable());
     
     //start puts in region on local site
     vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
@@ -1628,13 +1640,13 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
       Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
 
-      vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
-      vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+      vm2.invoke(createReceiverRunnable(nyPort));
+      vm3.invoke(createReceiverRunnable(nyPort));
 
-      vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-      vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-      vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-      vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+      vm4.invoke(createCacheRunnable(lnPort));
+      vm5.invoke(createCacheRunnable(lnPort));
+      vm6.invoke(createCacheRunnable(lnPort));
+      vm7.invoke(createCacheRunnable(lnPort));
 
       vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
         true, 100, 10, false, true, null, true ));
@@ -1654,10 +1666,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), "ln", 1, 100, isOffHeap() ));
 
-      vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-      vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-      vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-      vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+      vm4.invoke(startSenderRunnable());
+      vm5.invoke(startSenderRunnable());
+      vm6.invoke(startSenderRunnable());
+      vm7.invoke(startSenderRunnable());
 
       vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), null, 1, 100, isOffHeap() ));
@@ -1682,10 +1694,10 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
       vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
       vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
 
-      vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-      vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-      vm6.invoke(() -> WANTestBase.createCache( lnPort ));
-      vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+      vm4.invoke(createCacheRunnable(lnPort));
+      vm5.invoke(createCacheRunnable(lnPort));
+      vm6.invoke(createCacheRunnable(lnPort));
+      vm7.invoke(createCacheRunnable(lnPort));
 
       AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
         getTestMethodName(), null, 1, 100, isOffHeap() ));