You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2016/04/01 22:07:55 UTC

[15/18] incubator-geode git commit: GEODE-1062: Refactor of WANTestBase

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/65d7a6f1/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index d73084b..6685451 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -132,15 +133,15 @@ public class WANTestBase extends DistributedTestCase{
 
   protected static Cache cache;
   protected static Region region;
-  
+
   protected static PartitionedRegion customerRegion;
   protected static PartitionedRegion orderRegion;
   protected static PartitionedRegion shipmentRegion;
-  
+
   protected static final String customerRegionName = "CUSTOMER";
   protected static final String orderRegionName = "ORDER";
   protected static final String shipmentRegionName = "SHIPMENT";
-  
+
   protected static VM vm0;
   protected static VM vm1;
   protected static VM vm2;
@@ -149,26 +150,26 @@ public class WANTestBase extends DistributedTestCase{
   protected static VM vm5;
   protected static VM vm6;
   protected static VM vm7;
-  
+
   protected static QueueListener listener1;
   protected static QueueListener listener2;
-  
+
   protected static List<QueueListener> gatewayListeners;
-  
+
   protected static AsyncEventListener eventListener1 ;
   protected static AsyncEventListener eventListener2 ;
 
   private static final long MAX_WAIT = 10000;
-  
+
   protected static GatewayEventFilter eventFilter;
-  
+
   protected static boolean destroyFlag = false;
-  
-  protected static List<Integer> dispatcherThreads = 
+
+  protected static List<Integer> dispatcherThreads =
 	  new ArrayList<Integer>(Arrays.asList(1, 3, 5));
   //this will be set for each test method run with one of the values from above list
   protected static int numDispatcherThreadsForTheRun = 1;
-  
+
   public WANTestBase(String name) {
     super(name);
   }
@@ -201,15 +202,15 @@ public class WANTestBase extends DistributedTestCase{
 
   protected void postSetUpWANTestBase() throws Exception {
   }
-  
+
   public static void shuffleNumDispatcherThreads() {
-	  Collections.shuffle(dispatcherThreads);  
+	  Collections.shuffle(dispatcherThreads);
   }
-  
+
   public static void setNumDispatcherThreadsForTheRun(int numThreads) {
 	  numDispatcherThreadsForTheRun = numThreads;
   }
-  
+
   public static void stopOldLocator() {
     if (Locator.hasLocator()) {
       Locator.getLocator().stop();
@@ -225,19 +226,19 @@ public class WANTestBase extends DistributedTestCase{
     localLocatorBuffer.deleteCharAt(0);
     localLocatorBuffer.deleteCharAt(localLocatorBuffer.lastIndexOf("]"));
     String localLocator = localLocatorBuffer.toString();
-    localLocator = localLocator.replace(" ", ""); 
-    
+    localLocator = localLocator.replace(" ", "");
+
     props.setProperty(DistributionConfig.LOCATORS_NAME, localLocator);
     props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
     StringBuffer remoteLocatorBuffer = new StringBuffer(remoteLocatorsList.toString());
     remoteLocatorBuffer.deleteCharAt(0);
     remoteLocatorBuffer.deleteCharAt(remoteLocatorBuffer.lastIndexOf("]"));
     String remoteLocator = remoteLocatorBuffer.toString();
-    remoteLocator = remoteLocator.replace(" ", ""); 
+    remoteLocator = remoteLocator.replace(" ", "");
     props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, remoteLocator);
     test.getSystem(props);
   }
-  
+
   public static Integer createFirstLocatorWithDSId(int dsId) {
     stopOldLocator();
     WANTestBase test = new WANTestBase(getTestMethodName());
@@ -250,7 +251,7 @@ public class WANTestBase extends DistributedTestCase{
     test.getSystem(props);
     return port;
   }
-  
+
   public static Integer createFirstPeerLocator(int dsId) {
     stopOldLocator();
     WANTestBase test = new WANTestBase(getTestMethodName());
@@ -263,7 +264,7 @@ public class WANTestBase extends DistributedTestCase{
     test.getSystem(props);
     return port;
   }
-  
+
   public static Integer createSecondLocator(int dsId, int locatorPort) {
     stopOldLocator();
     WANTestBase test = new WANTestBase(getTestMethodName());
@@ -289,7 +290,7 @@ public class WANTestBase extends DistributedTestCase{
     test.getSystem(props);
     return port;
   }
-  
+
   public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
     stopOldLocator();
     WANTestBase test = new WANTestBase(getTestMethodName());
@@ -303,7 +304,7 @@ public class WANTestBase extends DistributedTestCase{
     test.getSystem(props);
     return port;
   }
-  
+
   public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -316,8 +317,8 @@ public class WANTestBase extends DistributedTestCase{
     test.getSystem(props);
     return;
   }
-  
-  
+
+
   public static Integer createFirstRemotePeerLocator(int dsId, int remoteLocPort) {
     stopOldLocator();
     WANTestBase test = new WANTestBase(getTestMethodName());
@@ -331,7 +332,7 @@ public class WANTestBase extends DistributedTestCase{
     test.getSystem(props);
     return port;
   }
-  
+
   public static Integer createSecondRemoteLocator(int dsId, int localPort,
       int remoteLocPort) {
     stopOldLocator();
@@ -346,7 +347,7 @@ public class WANTestBase extends DistributedTestCase{
     test.getSystem(props);
     return port;
   }
-  
+
   public static Integer createSecondRemotePeerLocator(int dsId, int localPort,
       int remoteLocPort) {
     stopOldLocator();
@@ -361,7 +362,7 @@ public class WANTestBase extends DistributedTestCase{
     test.getSystem(props);
     return port;
   }
-  
+
   public static void createReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
         .getName());
@@ -409,7 +410,7 @@ public class WANTestBase extends DistributedTestCase{
     Region r = cache.createRegionFactory(fact.create()).create(regionName);
     assertNotNull(r);
   }
-  
+
 //  public static void createReplicatedRegion_PDX(String regionName, String senderId, DataPolicy policy, InterestPolicy intPolicy){
 //    AttributesFactory fact = new AttributesFactory();
 //    if(senderId!= null){
@@ -427,7 +428,7 @@ public class WANTestBase extends DistributedTestCase{
 //    assertNotNull(r);
 //    assertTrue(r.size() == 0);
 //  }
-  
+
   public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
     AttributesFactory fact = new AttributesFactory();
     if(senderIds!= null){
@@ -444,7 +445,7 @@ public class WANTestBase extends DistributedTestCase{
     Region r = cache.createRegionFactory(fact.create()).create(regionName);
     assertNotNull(r);
   }
-  
+
 //  public static void createReplicatedRegionWithParallelSenderId(String regionName, String senderId){
 //    AttributesFactory fact = new AttributesFactory();
 //    if(senderId!= null){
@@ -458,14 +459,14 @@ public class WANTestBase extends DistributedTestCase{
 //    Region r = cache.createRegionFactory(fact.create()).create(regionName);
 //    assertNotNull(r);
 //  }
-  
+
 //  public static void createReplicatedRegion(String regionName){
 //    AttributesFactory fact = new AttributesFactory();
 //    fact.setDataPolicy(DataPolicy.REPLICATE);
 //    Region r = cache.createRegionFactory(fact.create()).create(regionName);
 //    assertNotNull(r);
 //  }
-  
+
   public static void createReplicatedRegionWithAsyncEventQueue(
       String regionName, String asyncQueueIds, Boolean offHeap) {
     IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -489,10 +490,10 @@ public class WANTestBase extends DistributedTestCase{
       exp1.remove();
     }
   }
-  
+
   public static void createPersistentReplicatedRegionWithAsyncEventQueue(
       String regionName, String asyncQueueIds) {
-        
+
     AttributesFactory fact = new AttributesFactory();
     if(asyncQueueIds != null){
       StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
@@ -506,9 +507,9 @@ public class WANTestBase extends DistributedTestCase{
     Region r = regionFactory.create(regionName);
     assertNotNull(r);
   }
-  
-  
-  
+
+
+
   public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
       String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -535,7 +536,7 @@ public class WANTestBase extends DistributedTestCase{
       exp.remove();
     }
   }
-  
+
   public static void createReplicatedRegion(String regionName, String senderIds, Scope scope, DataPolicy policy, Boolean offHeap){
     AttributesFactory fact = new AttributesFactory();
     if(senderIds!= null){
@@ -553,12 +554,12 @@ public class WANTestBase extends DistributedTestCase{
     Region r = cache.createRegionFactory(fact.create()).create(regionName);
     assertNotNull(r);
   }
-  
+
   public static void createAsyncEventQueue(
-      String asyncChannelId, boolean isParallel, 
-      Integer maxMemory, Integer batchSize, boolean isConflation, 
+      String asyncChannelId, boolean isParallel,
+      Integer maxMemory, Integer batchSize, boolean isConflation,
       boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) {
-    
+
     if (diskStoreName != null) {
       File directory = new File(asyncChannelId + "_disk_"
           + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
@@ -568,9 +569,9 @@ public class WANTestBase extends DistributedTestCase{
       dsf.setDiskDirs(dirs1);
       DiskStore ds = dsf.create(diskStoreName);
     }
-    
+
     AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-    
+
     AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
     factory.setBatchSize(batchSize);
     factory.setPersistent(isPersistent);
@@ -583,11 +584,11 @@ public class WANTestBase extends DistributedTestCase{
     factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
     AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
   }
-  
+
   public static void createAsyncEventQueueWithListener2(String asyncChannelId,
       boolean isParallel, Integer maxMemory, Integer batchSize,
       boolean isPersistent, String diskStoreName) {
-    
+
     if (diskStoreName != null) {
       File directory = new File(asyncChannelId + "_disk_"
           + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
@@ -597,7 +598,7 @@ public class WANTestBase extends DistributedTestCase{
       dsf.setDiskDirs(dirs1);
       DiskStore ds = dsf.create(diskStoreName);
     }
-    
+
     AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
 
     AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
@@ -611,12 +612,12 @@ public class WANTestBase extends DistributedTestCase{
     AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
         asyncEventListener);
   }
-  
+
   public static void createAsyncEventQueue(
-    String asyncChannelId, boolean isParallel, Integer maxMemory, 
-    Integer batchSize, boolean isConflation, boolean isPersistent, 
+    String asyncChannelId, boolean isParallel, Integer maxMemory,
+    Integer batchSize, boolean isConflation, boolean isPersistent,
     String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
-	    
+
 	if (diskStoreName != null) {
 	  File directory = new File(asyncChannelId + "_disk_"
 		+ System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
@@ -626,7 +627,7 @@ public class WANTestBase extends DistributedTestCase{
 	  dsf.setDiskDirs(dirs1);
 	  DiskStore ds = dsf.create(diskStoreName);
 	}
-	
+
 	String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
 	String className = packagePrefix + asyncListenerClass;
 	AsyncEventListener asyncEventListener = null;
@@ -640,7 +641,7 @@ public class WANTestBase extends DistributedTestCase{
 	} catch (IllegalAccessException e) {
 	  throw e;
 	}
-	    
+
 	AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
 	factory.setBatchSize(batchSize);
 	factory.setPersistent(isPersistent);
@@ -653,7 +654,7 @@ public class WANTestBase extends DistributedTestCase{
 	factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
 	AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
   }
-  
+
   public static void createAsyncEventQueueWithCustomListener(
       String asyncChannelId, boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
@@ -661,7 +662,7 @@ public class WANTestBase extends DistributedTestCase{
     createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, maxMemory, batchSize,
         isConflation, isPersistent, diskStoreName, isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS);
   }
-  
+
   public static void createAsyncEventQueueWithCustomListener(
       String asyncChannelId, boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
@@ -698,11 +699,11 @@ public class WANTestBase extends DistributedTestCase{
   }
 
   public static void createConcurrentAsyncEventQueue(
-      String asyncChannelId, boolean isParallel, 
-      Integer maxMemory, Integer batchSize, boolean isConflation, 
+      String asyncChannelId, boolean isParallel,
+      Integer maxMemory, Integer batchSize, boolean isConflation,
       boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
       int dispatcherThreads, OrderPolicy policy) {
-    
+
     if (diskStoreName != null) {
       File directory = new File(asyncChannelId + "_disk_"
           + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
@@ -712,9 +713,9 @@ public class WANTestBase extends DistributedTestCase{
       dsf.setDiskDirs(dirs1);
       DiskStore ds = dsf.create(diskStoreName);
     }
-    
+
     AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-    
+
     AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
     factory.setBatchSize(batchSize);
     factory.setPersistent(isPersistent);
@@ -727,27 +728,27 @@ public class WANTestBase extends DistributedTestCase{
     factory.setOrderPolicy(policy);
     AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
   }
-  
-  
+
+
   public static String createAsyncEventQueueWithDiskStore(
-      String asyncChannelId, boolean isParallel, 
-      Integer maxMemory, Integer batchSize, 
+      String asyncChannelId, boolean isParallel,
+      Integer maxMemory, Integer batchSize,
       boolean isPersistent, String diskStoreName) {
-    
+
     AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-    
+
     File persistentDirectory = null;
     if (diskStoreName == null) {
       persistentDirectory = new File(asyncChannelId + "_disk_"
           + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
     } else {
-      persistentDirectory = new File(diskStoreName); 
+      persistentDirectory = new File(diskStoreName);
     }
     LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName());
     persistentDirectory.mkdir();
     DiskStoreFactory dsf = cache.createDiskStoreFactory();
     File [] dirs1 = new File[] {persistentDirectory};
-    
+
     AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
     factory.setBatchSize(batchSize);
     factory.setParallel(isParallel);
@@ -761,23 +762,23 @@ public class WANTestBase extends DistributedTestCase{
     AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
     return persistentDirectory.getName();
   }
-  
+
   public static void pauseAsyncEventQueue(String asyncChannelId) {
     AsyncEventQueue theChannel = null;
-    
+
     Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
     for (AsyncEventQueue asyncChannel : asyncEventChannels) {
       if (asyncChannelId.equals(asyncChannel.getId())) {
         theChannel = asyncChannel;
       }
     }
-    
+
     ((AsyncEventQueueImpl)theChannel).getSender().pause();
  }
-  
+
   public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(String asyncChannelId) {
     AsyncEventQueue theChannel = null;
-    
+
     Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
     for (AsyncEventQueue asyncChannel : asyncEventChannels) {
       if (asyncChannelId.equals(asyncChannel.getId())) {
@@ -785,39 +786,39 @@ public class WANTestBase extends DistributedTestCase{
         break;
       }
     }
-    
+
     ((AsyncEventQueueImpl)theChannel).getSender().pause();
-    
-    
+
+
     ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender()).getEventProcessor().waitForDispatcherToPause();
   }
-  
+
  public static void resumeAsyncEventQueue(String asyncQueueId) {
     AsyncEventQueue theQueue = null;
-    
+
     Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
     for (AsyncEventQueue asyncChannel : asyncEventChannels) {
       if (asyncQueueId.equals(asyncChannel.getId())) {
         theQueue = asyncChannel;
       }
     }
-    
+
     ((AsyncEventQueueImpl)theQueue).getSender().resume();
   }
-  
-  
+
+
   public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
     AsyncEventQueue theAsyncEventQueue = null;
-    
+
     Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
     for (AsyncEventQueue asyncChannel : asyncEventChannels) {
       if (asyncQueueId.equals(asyncChannel.getId())) {
         theAsyncEventQueue = asyncChannel;
       }
     }
-    
+
     GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue).getSender();
-    
+
     if (sender.isParallel()) {
       Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
       assertEquals(numQueueEntries,
@@ -831,12 +832,12 @@ public class WANTestBase extends DistributedTestCase{
       assertEquals(numQueueEntries, size);
     }
   }
-  
+
   /**
    * This method verifies the queue size of a ParallelGatewaySender. For
    * ParallelGatewaySender conflation happens in a separate thread, hence test
    * code needs to wait for some time for expected result
-   * 
+   *
    * @param asyncQueueId
    *          Async Queue ID
    * @param numQueueEntries
@@ -881,7 +882,7 @@ public class WANTestBase extends DistributedTestCase{
 
     }
   }
-  
+
   public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
         .getName());
@@ -912,7 +913,7 @@ public class WANTestBase extends DistributedTestCase{
       exp1.remove();
     }
   }
-  
+
   // TODO:OFFHEAP: add offheap flavor
   public static void createPartitionedRegionWithPersistence(String regionName,
       String senderIds, Integer redundantCopies, Integer totalNumBuckets) {
@@ -976,7 +977,7 @@ public class WANTestBase extends DistributedTestCase{
 		exp1.remove();
 	}
   }
-  
+
   public static void addSenderThroughAttributesMutator(String regionName,
       String senderIds){
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
@@ -984,7 +985,7 @@ public class WANTestBase extends DistributedTestCase{
     AttributesMutator mutator = r.getAttributesMutator();
     mutator.addGatewaySenderId(senderIds);
   }
-  
+
   public static void addAsyncEventQueueThroughAttributesMutator(
       String regionName, String queueId) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
@@ -992,7 +993,7 @@ public class WANTestBase extends DistributedTestCase{
     AttributesMutator mutator = r.getAttributesMutator();
     mutator.addAsyncEventQueueId(queueId);
   }
-  
+
   public static void createPartitionedRegionWithAsyncEventQueue(
       String regionName, String asyncEventQueueId, Boolean offHeap) {
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -1015,10 +1016,10 @@ public class WANTestBase extends DistributedTestCase{
       exp1.remove();
     }
   }
-  
+
   public static void createColocatedPartitionedRegionWithAsyncEventQueue(
     String regionName, String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) {
-	
+
 	IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
 	  .getName());
 	IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
@@ -1039,7 +1040,7 @@ public class WANTestBase extends DistributedTestCase{
 	  exp1.remove();
 	}
   }
-  
+
   public static void createPersistentPartitionedRegionWithAsyncEventQueue(
       String regionName, String asyncEventQueueId) {
     AttributesFactory fact = new AttributesFactory();
@@ -1058,7 +1059,7 @@ public class WANTestBase extends DistributedTestCase{
     Region r = cache.createRegionFactory(fact.create()).create(regionName);
     assertNotNull(r);
   }
-  
+
   /**
    * Create PartitionedRegion with 1 redundant copy
    */
@@ -1083,7 +1084,7 @@ public class WANTestBase extends DistributedTestCase{
       exp.remove();
     }
   }
-  
+
   public static void createPartitionedRegionAccessorWithAsyncEventQueue(
       String regionName, String asyncEventQueueId) {
     AttributesFactory fact = new AttributesFactory();
@@ -1097,7 +1098,7 @@ public class WANTestBase extends DistributedTestCase{
     //fact.create()).create(regionName);
     assertNotNull(r);
   }
-  
+
   public static void createPartitionedRegionAsAccessor(
       String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
     AttributesFactory fact = new AttributesFactory();
@@ -1118,7 +1119,7 @@ public class WANTestBase extends DistributedTestCase{
     Region r = cache.createRegionFactory(fact.create()).create(regionName);
     assertNotNull(r);
   }
-  
+
   public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName, String serialSenderIds, String parallelSenderIds, String colocatedWith, Boolean offHeap){
     AttributesFactory fact = new AttributesFactory();
     if (serialSenderIds != null) {
@@ -1146,14 +1147,14 @@ public class WANTestBase extends DistributedTestCase{
     Region r = cache.createRegionFactory(fact.create()).create(regionName);
     assertNotNull(r);
   }
-  
+
   public static void createPersistentPartitionedRegion(
-      String regionName, 
-      String senderIds, 
-      Integer redundantCopies, 
+      String regionName,
+      String senderIds,
+      Integer redundantCopies,
       Integer totalNumBuckets,
       Boolean offHeap){
-    
+
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
         .getName());
     IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
@@ -1184,7 +1185,7 @@ public class WANTestBase extends DistributedTestCase{
       exp1.remove();
     }
   }
-  
+
   public static void createCustomerOrderShipmentPartitionedRegion(
       String regionName, String senderIds, Integer redundantCopies,
       Integer totalNumBuckets, Boolean offHeap) {
@@ -1272,7 +1273,7 @@ public class WANTestBase extends DistributedTestCase{
       exp.remove();
     }
   }
-  
+
   public static void createColocatedPartitionedRegions(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
     AttributesFactory fact = new AttributesFactory();
     if(senderIds!= null){
@@ -1291,17 +1292,17 @@ public class WANTestBase extends DistributedTestCase{
     fact.setOffHeap(offHeap);
     Region r = cache.createRegionFactory(fact.create()).create(regionName);
     assertNotNull(r);
-    
+
     pfact.setColocatedWith(r.getName());
     fact.setPartitionAttributes(pfact.create());
     fact.setOffHeap(offHeap);
     Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1");
-    assertNotNull(r1);    
-    
+    assertNotNull(r1);
+
     Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2");
     assertNotNull(r2);
   }
-  
+
   public static void createColocatedPartitionedRegions2 (String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
     AttributesFactory fact = new AttributesFactory();
     if(senderIds!= null){
@@ -1320,36 +1321,57 @@ public class WANTestBase extends DistributedTestCase{
     fact.setOffHeap(offHeap);
     Region r = cache.createRegionFactory(fact.create()).create(regionName);
     assertNotNull(r);
-    
-    
+
+
     fact = new AttributesFactory();
     pfact.setColocatedWith(r.getName());
     fact.setPartitionAttributes(pfact.create());
     fact.setOffHeap(offHeap);
     Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1");
-    assertNotNull(r1);    
-    
+    assertNotNull(r1);
+
     Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2");
     assertNotNull(r2);
   }
-  
+
+  public static void createCacheInVMs(Integer locatorPort, VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> createCache(locatorPort));
+    }
+  }
+
+  public static void createCacheInVMsAsync(Integer locatorPort, VM... vms) {
+    List<AsyncInvocation> tasks = new LinkedList<>();
+    for (VM vm : vms) {
+      tasks.add(vm.invokeAsync(() -> createCache(locatorPort)));
+    }
+    for (AsyncInvocation invocation : tasks) {
+      try {
+        invocation.join(60000);
+      }
+      catch (InterruptedException e) {
+        fail("Failed starting up the cache");
+      }
+    }
+  }
+
   public static void createCache(Integer locPort){
     createCache(false, locPort);
   }
   public static void createManagementCache(Integer locPort){
     createCache(true, locPort);
   }
-  
+
   public static void createCacheConserveSockets(Boolean conserveSockets,Integer locPort){
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
     props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
     props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
-    props.setProperty(DistributionConfig.CONSERVE_SOCKETS_NAME, conserveSockets.toString());   
+    props.setProperty(DistributionConfig.CONSERVE_SOCKETS_NAME, conserveSockets.toString());
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
   }
-  
+
   protected static void createCache(boolean management, Integer locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -1362,9 +1384,9 @@ public class WANTestBase extends DistributedTestCase{
     props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
     props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
     InternalDistributedSystem ds = test.getSystem(props);
-    cache = CacheFactory.create(ds);    
+    cache = CacheFactory.create(ds);
   }
-  
+
   protected static void createCacheWithSSL(Integer locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
 
@@ -1372,31 +1394,31 @@ public class WANTestBase extends DistributedTestCase{
     String  gatewaySslprotocols = "any";
     String  gatewaySslciphers = "any";
     boolean gatewaySslRequireAuth = true;
-    
+
     Properties gemFireProps = test.getDistributedSystemProperties();
     gemFireProps.put(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_ENABLED_NAME, String.valueOf(gatewaySslenabled));
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_PROTOCOLS_NAME, gatewaySslprotocols);
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_CIPHERS_NAME, gatewaySslciphers);
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_REQUIRE_AUTHENTICATION_NAME, String.valueOf(gatewaySslRequireAuth));
-    
+
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_TYPE_NAME, "jks");
-    gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME, 
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME,
         TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.keystore"));
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_PASSWORD_NAME, "password");
-    gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME, 
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME,
         TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.truststore"));
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_PASSWORD_NAME, "password");
-    
+
     gemFireProps.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
     gemFireProps.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
-    
+
     LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps);
-    
+
     InternalDistributedSystem ds = test.getSystem(gemFireProps);
-    cache = CacheFactory.create(ds);    
+    cache = CacheFactory.create(ds);
   }
-  
+
   public static void createCache_PDX(Integer locPort){
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -1407,13 +1429,13 @@ public class WANTestBase extends DistributedTestCase{
     cacheConfig.setPdxPersistent(true);
     cacheConfig.setPdxDiskStore("PDX_TEST");
     cache = GemFireCacheImpl.create(ds, false, cacheConfig);
-    
+
     File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
     DiskStoreFactory dsf = cache.createDiskStoreFactory();
     File [] dirs1 = new File[] {pdxDir};
     DiskStore store = dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
   }
-  
+
   public static void createCache(Integer locPort1, Integer locPort2){
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -1423,7 +1445,7 @@ public class WANTestBase extends DistributedTestCase{
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
   }
-  
+
   public static void createCacheWithoutLocator(Integer mCastPort){
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -1431,10 +1453,10 @@ public class WANTestBase extends DistributedTestCase{
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
   }
-  
+
   /**
    * Method that creates a bridge server
-   * 
+   *
    * @return    Integer   Port on which the server is started.
    */
   public static Integer createCacheServer() {
@@ -1452,16 +1474,16 @@ public class WANTestBase extends DistributedTestCase{
 
     return new Integer(server1.getPort());
   }
-  
+
   /**
    * Returns a Map that contains the count for number of bridge server and number
    * of Receivers.
-   * 
+   *
    * @return    Map
    */
   public static Map getCacheServers() {
     List cacheServers = cache.getCacheServers();
-    
+
     Map cacheServersMap = new HashMap();
     Iterator itr = cacheServers.iterator();
     int bridgeServerCounter = 0;
@@ -1478,7 +1500,29 @@ public class WANTestBase extends DistributedTestCase{
     cacheServersMap.put("ReceiverServer", receiverServerCounter);
     return cacheServersMap;
   }
-  
+
+  public static void startSenderInVMs(String senderId, VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> startSender(senderId));
+    }
+  }
+
+  public static void startSenderInVMsAsync(String senderId, VM... vms) {
+    List<AsyncInvocation> tasks = new LinkedList<>();
+    for (VM vm : vms) {
+      tasks.add(vm.invokeAsync(() -> startSender(senderId)));
+    }
+    for (AsyncInvocation invocation : tasks) {
+      try {
+        invocation.join(30000);
+      }
+      catch (InterruptedException e) {
+        fail("Starting senders was interrupted");
+      }
+    }
+  }
+
+
   public static void startSender(String senderId) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
 
@@ -1503,7 +1547,7 @@ public class WANTestBase extends DistributedTestCase{
     }
 
   }
-  
+
   public static void enableConflation(String senderId) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     AbstractGatewaySender sender = null;
@@ -1515,7 +1559,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     sender.test_setBatchConflationEnabled(true);
   }
-  
+
   public static void startAsyncEventQueue(String senderId) {
     Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
     AsyncEventQueue q = null;
@@ -1528,8 +1572,8 @@ public class WANTestBase extends DistributedTestCase{
     //merge42180: There is no start method on AsyncEventQueue. Cheetah has this method. Yet the code for AsyncEvnt Queue is not properly merged from cheetah to cedar
     //q.start();
   }
-  
-  public static Map getSenderToReceiverConnectionInfo(String senderId){	
+
+  public static Map getSenderToReceiverConnectionInfo(String senderId){
 	  Set<GatewaySender> senders = cache.getGatewaySenders();
 	  GatewaySender sender = null;
 	  for(GatewaySender s : senders){
@@ -1541,14 +1585,14 @@ public class WANTestBase extends DistributedTestCase{
 	  Map connectionInfo = null;
 	  if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) {
 		  connectionInfo = new HashMap();
-		  GatewaySenderEventDispatcher dispatcher = 
+		  GatewaySenderEventDispatcher dispatcher =
 			  ((AbstractGatewaySender)sender).getEventProcessor().getDispatcher();
 		  if (dispatcher instanceof GatewaySenderEventRemoteDispatcher) {
-			  ServerLocation serverLocation = 
+			  ServerLocation serverLocation =
 				  ((GatewaySenderEventRemoteDispatcher) dispatcher).getConnection(false).getServer();
 			  connectionInfo.put("serverHost", serverLocation.getHostName());
 			  connectionInfo.put("serverPort", serverLocation.getPort());
-			  
+
 		  }
 	  }
 	  return connectionInfo;
@@ -1594,7 +1638,7 @@ public class WANTestBase extends DistributedTestCase{
     stats.add(statistics.getEventsConflatedFromBatches());
     return stats;
   }
-      
+
   public static void checkQueueStats(String senderId, final int queueSize,
       final int eventsReceived, final int eventsQueued,
       final int eventsDistributed) {
@@ -1606,7 +1650,7 @@ public class WANTestBase extends DistributedTestCase{
         break;
       }
     }
-    
+
     final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
     assertEquals(queueSize, statistics.getEventQueueSize());
     assertEquals(eventsReceived, statistics.getEventsReceived());
@@ -1626,12 +1670,12 @@ public class WANTestBase extends DistributedTestCase{
       }
     }
     final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
-    assertEquals(queueSize, statistics.getEventQueueSize()); 
+    assertEquals(queueSize, statistics.getEventQueueSize());
     assertEquals(eventsReceived, statistics.getEventsReceived());
     assertEquals(eventsQueued, statistics.getEventsQueued());
     assert(statistics.getEventsDistributed() >= eventsDistributed);
   }
-  
+
   public static void checkGatewayReceiverStats(int processBatches,
       int eventsReceived, int creates) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
@@ -1658,7 +1702,7 @@ public class WANTestBase extends DistributedTestCase{
     assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches);
     assertTrue(gatewayReceiverStats.getEventsReceived()>= eventsReceived);
   }
-  
+
   public static void checkExcepitonStats(int exceptionsOccured) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
     GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
@@ -1689,7 +1733,7 @@ public class WANTestBase extends DistributedTestCase{
     assertTrue(gatewayReceiverStats.getEventsReceived() >= eventsReceived);
     assertTrue(gatewayReceiverStats.getCreateRequest() >= creates);
   }
-  
+
   public static void checkEventFilteredStats(String senderId, final int eventsFiltered) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
@@ -1702,7 +1746,7 @@ public class WANTestBase extends DistributedTestCase{
     final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
     assertEquals(eventsFiltered, statistics.getEventsFiltered());
   }
-  
+
   public static void checkConflatedStats(String senderId, final int eventsConflated) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
@@ -1715,7 +1759,7 @@ public class WANTestBase extends DistributedTestCase{
     final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
     assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
   }
-  
+
   public static void checkAsyncEventQueueConflatedStats(
       String asyncEventQueueId, final int eventsConflated) {
     Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
@@ -1730,7 +1774,7 @@ public class WANTestBase extends DistributedTestCase{
         .getStatistics();
     assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
   }
-  
+
   public static void checkStats_Failover(String senderId,
       final int eventsReceived) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
@@ -1749,7 +1793,7 @@ public class WANTestBase extends DistributedTestCase{
         + statistics.getUnprocessedTokensAddedByPrimary() + statistics
         .getUnprocessedEventsRemovedByPrimary()));
   }
-  
+
   public static void checkAsyncEventQueueStats_Failover(String asyncEventQueueId,
       final int eventsReceived) {
     Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
@@ -1783,7 +1827,7 @@ public class WANTestBase extends DistributedTestCase{
     assert (statistics.getBatchesDistributed() >= batches);
     assertEquals(0, statistics.getBatchesRedistributed());
   }
-  
+
   public static void checkAsyncEventQueueBatchStats(String asyncQueueId,
       final int batches) {
     Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
@@ -1835,7 +1879,7 @@ public class WANTestBase extends DistributedTestCase{
         (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
             .getUnprocessedTokensAddedByPrimary()));
   }
-  
+
   public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, int events) {
     Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
     AsyncEventQueue queue = null;
@@ -1853,34 +1897,7 @@ public class WANTestBase extends DistributedTestCase{
         (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
             .getUnprocessedTokensAddedByPrimary()));
   }
-  
-  
-  public static void setRemoveFromQueueOnException(String senderId, boolean removeFromQueue){
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for(GatewaySender s : senders){
-      if(s.getId().equals(senderId)){
-        sender = s;
-        break;
-      }
-    }
-    assertNotNull(sender);
-    ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(removeFromQueue);
-  }
-  
-  public static void unsetRemoveFromQueueOnException(String senderId){
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for(GatewaySender s : senders){
-      if(s.getId().equals(senderId)){
-        sender = s;
-        break;
-      }
-    }
-    assertNotNull(sender);
-    ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false);
-  }
-  
+
   public static void waitForSenderRunningState(String senderId){
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     try {
@@ -1904,7 +1921,7 @@ public class WANTestBase extends DistributedTestCase{
       exln.remove();
     }
   }
-  
+
   public static void waitForSenderToBecomePrimary(String senderId){
     Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
     final GatewaySender sender = getGatewaySenderById(senders, senderId);
@@ -1920,9 +1937,9 @@ public class WANTestBase extends DistributedTestCase{
         return "Expected sender primary state to be true but is false";
       }
     };
-    Wait.waitForCriterion(wc, 10000, 1000, true); 
+    Wait.waitForCriterion(wc, 10000, 1000, true);
   }
-  
+
   private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
     for(GatewaySender s : senders){
       if(s.getId().equals(senderId)){
@@ -1932,7 +1949,7 @@ public class WANTestBase extends DistributedTestCase{
     //if none of the senders matches with the supplied senderid, return null
     return null;
   }
-  
+
   public static HashMap checkQueue(){
     HashMap listenerAttrs = new HashMap();
     listenerAttrs.put("Create", listener1.createList);
@@ -1940,13 +1957,13 @@ public class WANTestBase extends DistributedTestCase{
     listenerAttrs.put("Destroy", listener1.destroyList);
     return listenerAttrs;
   }
-  
+
   public static void checkQueueOnSecondary (final Map primaryUpdatesMap){
     final HashMap secondaryUpdatesMap = new HashMap();
     secondaryUpdatesMap.put("Create", listener1.createList);
     secondaryUpdatesMap.put("Update", listener1.updateList);
     secondaryUpdatesMap.put("Destroy", listener1.destroyList);
-    
+
     WaitCriterion wc = new WaitCriterion() {
       public boolean done() {
         secondaryUpdatesMap.put("Create", listener1.createList);
@@ -1962,9 +1979,9 @@ public class WANTestBase extends DistributedTestCase{
         return "Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap;
       }
     };
-    Wait.waitForCriterion(wc, 300000, 500, true); 
+    Wait.waitForCriterion(wc, 300000, 500, true);
   }
-  
+
   public static HashMap checkQueue2(){
     HashMap listenerAttrs = new HashMap();
     listenerAttrs.put("Create", listener2.createList);
@@ -1972,18 +1989,18 @@ public class WANTestBase extends DistributedTestCase{
     listenerAttrs.put("Destroy", listener2.destroyList);
     return listenerAttrs;
   }
-  
+
   public static HashMap checkPR(String regionName){
     PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
     QueueListener listener = (QueueListener)region.getCacheListener();
-    
+
     HashMap listenerAttrs = new HashMap();
     listenerAttrs.put("Create", listener.createList);
     listenerAttrs.put("Update", listener.updateList);
     listenerAttrs.put("Destroy", listener.destroyList);
     return listenerAttrs;
   }
-  
+
   public static HashMap checkBR(String regionName, int numBuckets){
     PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
     HashMap listenerAttrs = new HashMap();
@@ -1996,7 +2013,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return listenerAttrs;
   }
-  
+
   public static HashMap checkQueue_PR(String senderId){
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
@@ -2006,20 +2023,20 @@ public class WANTestBase extends DistributedTestCase{
         break;
       }
     }
-    
+
     RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
     .getQueues().toArray(new RegionQueue[1])[0];
-    
+
     PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
     QueueListener listener = (QueueListener)region.getCacheListener();
-    
+
     HashMap listenerAttrs = new HashMap();
     listenerAttrs.put("Create", listener.createList);
     listenerAttrs.put("Update", listener.updateList);
     listenerAttrs.put("Destroy", listener.destroyList);
     return listenerAttrs;
   }
-  
+
   public static HashMap checkQueue_BR(String senderId, int numBuckets){
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
@@ -2031,7 +2048,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
     .getQueues().toArray(new RegionQueue[1])[0];
-    
+
     PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
     HashMap listenerAttrs = new HashMap();
     for (int i = 0; i < numBuckets; i++) {
@@ -2052,7 +2069,7 @@ public class WANTestBase extends DistributedTestCase{
     WANTestBase test = new WANTestBase(getTestMethodName());
     test.addCacheListenerOnBucketRegion(regionName, numBuckets);
   }
-  
+
   private void addCacheListenerOnBucketRegion(String regionName, int numBuckets){
     PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
     for (int i = 0; i < numBuckets; i++) {
@@ -2062,12 +2079,12 @@ public class WANTestBase extends DistributedTestCase{
       mutator.addCacheListener(listener1);
     }
   }
-  
+
   public static void addListenerOnQueueBucketRegion(String senderId, int numBuckets) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     test.addCacheListenerOnQueueBucketRegion(senderId, numBuckets);
   }
-  
+
   private void addCacheListenerOnQueueBucketRegion(String senderId, int numBuckets){
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
@@ -2079,7 +2096,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
     .getQueues().toArray(new RegionQueue[1])[0];
-    
+
     PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
     for (int i = 0; i < numBuckets; i++) {
       BucketRegion br = region.getBucketRegion(i);
@@ -2089,19 +2106,19 @@ public class WANTestBase extends DistributedTestCase{
         mutator.addCacheListener(listener);
       }
     }
-    
+
   }
-  
+
   public static void addQueueListener(String senderId, boolean isParallel){
     WANTestBase test = new WANTestBase(getTestMethodName());
     test.addCacheQueueListener(senderId, isParallel);
   }
-  
+
   public static void addSecondQueueListener(String senderId, boolean isParallel){
     WANTestBase test = new WANTestBase(getTestMethodName());
     test.addSecondCacheQueueListener(senderId, isParallel);
   }
-  
+
   public static void addListenerOnRegion(String regionName){
     WANTestBase test = new WANTestBase(getTestMethodName());
     test.addCacheListenerOnRegion(regionName);
@@ -2112,7 +2129,7 @@ public class WANTestBase extends DistributedTestCase{
     listener1 = new QueueListener();
     mutator.addCacheListener(listener1);
   }
-  
+
   private void addCacheQueueListener(String senderId, boolean isParallel) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
@@ -2158,7 +2175,7 @@ public class WANTestBase extends DistributedTestCase{
       parallelQueue.addCacheListener(listener2);
     }
   }
-  
+
   public static void pauseSender(String senderId) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -2174,14 +2191,14 @@ public class WANTestBase extends DistributedTestCase{
       }
       sender.pause();
       ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause();
-      
+
     }
     finally {
       exp.remove();
       exln.remove();
     }
   }
-      
+
   public static void pauseSenderAndWaitForDispatcherToPause(String senderId) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -2200,9 +2217,9 @@ public class WANTestBase extends DistributedTestCase{
     } finally {
       exp.remove();
       exln.remove();
-    }    
+    }
   }
-  
+
   public static void resumeSender(String senderId) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -2223,7 +2240,7 @@ public class WANTestBase extends DistributedTestCase{
       exln.remove();
     }
   }
-  
+
   public static void stopSender(String senderId) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -2242,7 +2259,7 @@ public class WANTestBase extends DistributedTestCase{
         eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor();
       }
       sender.stop();
-      
+
       Set<RegionQueue> queues = null;
       if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) {
         queues = ((ConcurrentSerialGatewaySenderEventProcessor)eventProcessor).getQueues();
@@ -2258,14 +2275,14 @@ public class WANTestBase extends DistributedTestCase{
       exln.remove();
     }
   }
-  
+
   public static void stopReceivers() {
     Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
     for (GatewayReceiver receiver : receivers) {
       receiver.stop();
     }
   }
-  
+
   public static void startReceivers() {
     Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
     for (GatewayReceiver receiver : receivers) {
@@ -2276,12 +2293,12 @@ public class WANTestBase extends DistributedTestCase{
       }
     }
   }
-  
+
   public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
       GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) {
-    
+
     InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory();
     gateway.setParallel(isParallel);
     gateway.setMaximumQueueMemory(maxMemory);
@@ -2305,7 +2322,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return gateway;
   }
-  
+
   public static void createSender(String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
@@ -2324,7 +2341,7 @@ public class WANTestBase extends DistributedTestCase{
       exln.remove();
     }
   }
-  
+
   public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId,
 	boolean isParallel, Integer maxMemory,
 	Integer batchSize, boolean isConflation, boolean isPersistent,
@@ -2343,12 +2360,12 @@ public class WANTestBase extends DistributedTestCase{
       exln.remove();
     }
   }
-  
+
   public static void createSenderWithoutDiskStore(String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
       GatewayEventFilter filter, boolean isManulaStart) {
-    
+
       GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
       gateway.setParallel(true);
       gateway.setMaximumQueueMemory(maxMemory);
@@ -2359,7 +2376,7 @@ public class WANTestBase extends DistributedTestCase{
       gateway.setBatchConflationEnabled(isConflation);
       gateway.create(dsName, remoteDsId);
   }
-  
+
   public static void createConcurrentSender(String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory, Integer batchSize,
       boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
@@ -2373,16 +2390,16 @@ public class WANTestBase extends DistributedTestCase{
     GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
     gateway.create(dsName, remoteDsId);
   }
-  
+
 //  public static void createSender_PDX(String dsName, int remoteDsId,
 //      boolean isParallel, Integer maxMemory,
 //      Integer batchSize, boolean isConflation, boolean isPersistent,
 //      GatewayEventFilter filter, boolean isManulaStart) {
 //    File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
 //    persistentDirectory.mkdir();
-//    
+//
 //    File [] dirs1 = new File[] {persistentDirectory};
-//    
+//
 //    if(isParallel) {
 //      ParallelGatewaySenderFactory gateway = cache.createParallelGatewaySenderFactory();
 //      gateway.setMaximumQueueMemory(maxMemory);
@@ -2398,7 +2415,7 @@ public class WANTestBase extends DistributedTestCase{
 //      }
 //      gateway.setBatchConflationEnabled(isConflation);
 //      gateway.create(dsName, remoteDsId);
-//      
+//
 //    }else {
 //      SerialGatewaySenderFactory gateway = cache.createSerialGatewaySenderFactory();
 //      gateway.setMaximumQueueMemory(maxMemory);
@@ -2502,7 +2519,7 @@ public class WANTestBase extends DistributedTestCase{
       exp1.remove();
     }
   }
-  
+
   public static String createSenderWithDiskStore(String dsName, int remoteDsId,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
@@ -2513,14 +2530,14 @@ public class WANTestBase extends DistributedTestCase{
           + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
     }
     else {
-      persistentDirectory = new File(dsStore);  
+      persistentDirectory = new File(dsStore);
     }
     LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName());
-    
+
     persistentDirectory.mkdir();
     DiskStoreFactory dsf = cache.createDiskStoreFactory();
     File [] dirs1 = new File[] {persistentDirectory};
-    
+
     if(isParallel) {
       GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
       gateway.setParallel(true);
@@ -2546,7 +2563,7 @@ public class WANTestBase extends DistributedTestCase{
       }
       gateway.setBatchConflationEnabled(isConflation);
       gateway.create(dsName, remoteDsId);
-      
+
     }else {
       GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
       gateway.setMaximumQueueMemory(maxMemory);
@@ -2565,15 +2582,15 @@ public class WANTestBase extends DistributedTestCase{
       }
       else {
         DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
-        
+
         gateway.setDiskStoreName(store.getName());
       }
       gateway.create(dsName, remoteDsId);
     }
     return persistentDirectory.getName();
   }
-  
-  
+
+
   public static void createSenderWithListener(String dsName, int remoteDsName,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
@@ -2641,7 +2658,7 @@ public class WANTestBase extends DistributedTestCase{
       ((InternalGatewaySenderFactory)gateway).create(dsName);
     }
   }
-  
+
   public static void pauseWaitCriteria(final long millisec) {
     WaitCriterion wc = new WaitCriterion() {
       public boolean done() {
@@ -2652,18 +2669,31 @@ public class WANTestBase extends DistributedTestCase{
         return "Expected to wait for " + millisec + " millisec.";
       }
     };
-    Wait.waitForCriterion(wc, millisec, 500, false); 
+    Wait.waitForCriterion(wc, millisec, 500, false);
   }
-  
-  public static int createReceiver(int locPort) {
-    WANTestBase test = new WANTestBase(getTestMethodName());
-    Properties props = test.getDistributedSystemProperties();
-    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
-    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
-        + "]");
 
-    InternalDistributedSystem ds = test.getSystem(props);
-    cache = CacheFactory.create(ds);    
+  public static void createReceiverInVMs(int locatorPort, VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> createReceiver(locatorPort));
+    }
+  }
+
+  public static void createReceiverInVMsAsync(int locatorPort, VM... vms) {
+    List<AsyncInvocation> tasks = new LinkedList<>();
+    for (VM vm : vms) {
+      tasks.add(vm.invokeAsync(() -> createReceiver(locatorPort)));
+    }
+    for (AsyncInvocation invocation : tasks) {
+      try {
+        invocation.join(30000);
+      }
+      catch (InterruptedException e) {
+        fail("Failed starting up the receiver");
+      }
+    }
+  }
+
+  public static int createReceiver(int locPort) {
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
     fact.setStartPort(port);
@@ -2675,12 +2705,12 @@ public class WANTestBase extends DistributedTestCase{
     }
     catch (IOException e) {
       e.printStackTrace();
-      fail("Test " + test.getName()
+      fail("Test " + getTestMethodName()
           + " failed to start GatewayRecevier on port " + port);
     }
     return port;
   }
-  
+
   public static void createReceiverWithBindAddress(int locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -2690,7 +2720,7 @@ public class WANTestBase extends DistributedTestCase{
         + "]");
 
     InternalDistributedSystem ds = test.getSystem(props);
-    cache = CacheFactory.create(ds);    
+    cache = CacheFactory.create(ds);
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
     fact.setStartPort(port);
@@ -2718,7 +2748,7 @@ public class WANTestBase extends DistributedTestCase{
     String  gatewaySslprotocols = "any";
     String  gatewaySslciphers = "any";
     boolean gatewaySslRequireAuth = true;
-    
+
     Properties gemFireProps = test.getDistributedSystemProperties();
 
     gemFireProps.put(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel());
@@ -2726,22 +2756,22 @@ public class WANTestBase extends DistributedTestCase{
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_PROTOCOLS_NAME, gatewaySslprotocols);
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_CIPHERS_NAME, gatewaySslciphers);
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_REQUIRE_AUTHENTICATION_NAME, String.valueOf(gatewaySslRequireAuth));
-    
+
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_TYPE_NAME, "jks");
-    gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME, 
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_NAME,
         TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.keystore"));
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_KEYSTORE_PASSWORD_NAME, "password");
-    gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME, 
+    gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_NAME,
         TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.truststore"));
     gemFireProps.put(DistributionConfig.GATEWAY_SSL_TRUSTSTORE_PASSWORD_NAME, "password");
-    
+
     gemFireProps.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
     gemFireProps.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
 
     LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps);
-    
+
     InternalDistributedSystem ds = test.getSystem(gemFireProps);
-    cache = CacheFactory.create(ds);    
+    cache = CacheFactory.create(ds);
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
     fact.setStartPort(port);
@@ -2758,16 +2788,16 @@ public class WANTestBase extends DistributedTestCase{
     }
     return port;
   }
-  
+
   public static String makePath(String[] strings) {
     StringBuilder sb = new StringBuilder();
     for(int i=0;i<strings.length;i++){
-      sb.append(strings[i]);      
+      sb.append(strings[i]);
       sb.append(File.separator);
     }
     return sb.toString();
   }
-  
+
   public static int createReceiverAfterCache(int locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
@@ -2786,7 +2816,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return port;
   }
-  
+
   public static void createReceiverAndServer(int locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -2795,7 +2825,7 @@ public class WANTestBase extends DistributedTestCase{
         + "]");
 
     InternalDistributedSystem ds = test.getSystem(props);
-    cache = CacheFactory.create(ds);    
+    cache = CacheFactory.create(ds);
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
     int receiverPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
     fact.setStartPort(receiverPort);
@@ -2821,7 +2851,7 @@ public class WANTestBase extends DistributedTestCase{
       com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e);
     }
   }
-  
+
   public static int createReceiverInSecuredCache(int locPort) {
 	GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
 	int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
@@ -2838,7 +2868,7 @@ public class WANTestBase extends DistributedTestCase{
         }
 	return port;
   }
-  
+
   public static int createServer(int locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -2846,7 +2876,7 @@ public class WANTestBase extends DistributedTestCase{
     props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
         + "]");
     InternalDistributedSystem ds = test.getSystem(props);
-    cache = CacheFactory.create(ds);    
+    cache = CacheFactory.create(ds);
 
     CacheServer server = cache.addCacheServer();
     int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
@@ -2860,8 +2890,8 @@ public class WANTestBase extends DistributedTestCase{
     }
     return port;
   }
-  
-  public static void createClientWithLocator(int port0,String host, 
+
+  public static void createClientWithLocator(int port0,String host,
       String regionName) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -2870,7 +2900,7 @@ public class WANTestBase extends DistributedTestCase{
 
     InternalDistributedSystem ds = test.getSystem(props);
     cache = CacheFactory.create(ds);
-    
+
     assertNotNull(cache);
     CacheServerTestUtil.disableShufflingOfEndpoints();
     Pool p;
@@ -2895,7 +2925,7 @@ public class WANTestBase extends DistributedTestCase{
         "Distributed Region " + regionName + " created Successfully :"
             + region.toString());
   }
-  
+
   public static int createReceiver_PDX(int locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -2927,7 +2957,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return port;
   }
-  
+
   public static void createReceiver2(int locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
@@ -2949,7 +2979,7 @@ public class WANTestBase extends DistributedTestCase{
   public static void doDistTXPuts(String regionName, int numPuts) {
     CacheTransactionManager txMgr = cache.getCacheTransactionManager();
     txMgr.setDistributed(true);
-    
+
     IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class
         .getName());
     IgnoredException exp2 = IgnoredException.addIgnoredException(GatewaySenderException.class
@@ -3008,7 +3038,7 @@ public class WANTestBase extends DistributedTestCase{
 //      r.destroy(i);
 //    }
   }
-  
+
   /**
    * To be used for CacheLoader related tests
    */
@@ -3019,7 +3049,7 @@ public class WANTestBase extends DistributedTestCase{
 	  r.get(i);
 	}
   }
-  
+
   public static void doPutsAfter300(String regionName, int numPuts) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3027,7 +3057,7 @@ public class WANTestBase extends DistributedTestCase{
       r.put(i, i);
     }
   }
-  
+
   public static void doPutsFrom(String regionName, int from, int numPuts) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3043,7 +3073,7 @@ public class WANTestBase extends DistributedTestCase{
       r.destroy(i);
     }
   }
-  
+
   public static void doPutAll(String regionName, int numPuts, int size) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3056,8 +3086,8 @@ public class WANTestBase extends DistributedTestCase{
       putAllMap.clear();
     }
   }
-  
-  
+
+
   public static void doPutsWithKeyAsString(String regionName, int numPuts) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3065,7 +3095,7 @@ public class WANTestBase extends DistributedTestCase{
       r.put("Object_" + i, i);
     }
   }
-  
+
   public static void putGivenKeyValue(String regionName, Map keyValues) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3073,11 +3103,11 @@ public class WANTestBase extends DistributedTestCase{
       r.put(key, keyValues.get(key));
     }
   }
-  
+
   public static void destroyRegion(String regionName) {
     destroyRegion(regionName, -1);
   }
-  
+
   public static void destroyRegion(String regionName, final int min) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3093,7 +3123,7 @@ public class WANTestBase extends DistributedTestCase{
         return "Looking for min size of region to be " + min;
       }
     };
-    Wait.waitForCriterion(wc, 30000, 5, false); 
+    Wait.waitForCriterion(wc, 30000, 5, false);
     r.destroyRegion();
   }
 
@@ -3112,11 +3142,11 @@ public class WANTestBase extends DistributedTestCase{
         return "Looking for min size of region to be " + min;
       }
     };
-    Wait.waitForCriterion(wc, 30000, 5, false); 
+    Wait.waitForCriterion(wc, 30000, 5, false);
     r.destroyRegion();
     destroyFlag = false;
   }
-  
+
   public static void localDestroyRegion(String regionName) {
     IgnoredException exp = IgnoredException.addIgnoredException(PRLocallyDestroyedException.class
         .getName());
@@ -3128,13 +3158,13 @@ public class WANTestBase extends DistributedTestCase{
       exp.remove();
     }
   }
-  
-  
+
+
   public static Map putCustomerPartitionedRegion(int numPuts) {
     String valueSuffix = "";
     return putCustomerPartitionedRegion(numPuts, valueSuffix);
   }
-  
+
   public static Map updateCustomerPartitionedRegion(int numPuts) {
     String valueSuffix = "_update";
     return putCustomerPartitionedRegion(numPuts, valueSuffix);
@@ -3189,7 +3219,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return orderKeyValues;
   }
-  
+
   public static Map putOrderPartitionedRegionUsingCustId(int numPuts) {
     assertNotNull(cache);
     assertNotNull(orderRegion);
@@ -3240,7 +3270,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return orderKeyValues;
   }
-  
+
   public static Map updateOrderPartitionedRegionUsingCustId(int numPuts) {
     assertNotNull(cache);
     assertNotNull(orderRegion);
@@ -3262,7 +3292,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return orderKeyValues;
   }
-  
+
   public static Map putShipmentPartitionedRegion(int numPuts) {
     assertNotNull(cache);
     assertNotNull(shipmentRegion);
@@ -3294,7 +3324,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return shipmentKeyValue;
   }
-  
+
   public static void putcolocatedPartitionedRegion(int numPuts) {
     assertNotNull(cache);
     assertNotNull(customerRegion);
@@ -3318,7 +3348,7 @@ public class WANTestBase extends DistributedTestCase{
       }
     }
   }
-  
+
   public static Map putShipmentPartitionedRegionUsingCustId(int numPuts) {
     assertNotNull(cache);
     assertNotNull(shipmentRegion);
@@ -3340,7 +3370,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return shipmentKeyValue;
   }
-  
+
   public static Map updateShipmentPartitionedRegion(int numPuts) {
     assertNotNull(cache);
     assertNotNull(shipmentRegion);
@@ -3372,7 +3402,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return shipmentKeyValue;
   }
-  
+
   public static Map updateShipmentPartitionedRegionUsingCustId(int numPuts) {
     assertNotNull(cache);
     assertNotNull(shipmentRegion);
@@ -3402,7 +3432,7 @@ public class WANTestBase extends DistributedTestCase{
       r.put("Key_" + i, new SimpleClass(i, (byte)i));
     }
   }
-  
+
   public static void doPutsPDXSerializable2(String regionName, int numPuts) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3410,8 +3440,8 @@ public class WANTestBase extends DistributedTestCase{
       r.put("Key_" + i, new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i));
     }
   }
-  
-  
+
+
   public static void doTxPuts(String regionName, int numPuts) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3423,7 +3453,7 @@ public class WANTestBase extends DistributedTestCase{
     r.put(200, 200);
     mgr.commit();
   }
-    
+
   public static void doNextPuts(String regionName, int start, int numPuts) {
     //waitForSitesToUpdate();
     IgnoredException exp = IgnoredException.addIgnoredException(CacheClosedException.class
@@ -3438,11 +3468,11 @@ public class WANTestBase extends DistributedTestCase{
       exp.remove();
     }
   }
-  
+
   public static void checkQueueSize(String senderId, int numQueueEntries) {
     Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> testQueueSize(senderId, numQueueEntries));
   }
-  
+
   public static void testQueueSize(String senderId, int numQueueEntries) {
     GatewaySender sender = null;
     for (GatewaySender s : cache.getGatewaySenders()) {
@@ -3468,7 +3498,7 @@ public class WANTestBase extends DistributedTestCase{
       assertEquals(numQueueEntries, size);
     }
   }
-  
+
   /**
    * To be used only for ParallelGatewaySender.
    * @param senderId    Id of the ParallelGatewaySender
@@ -3487,7 +3517,7 @@ public class WANTestBase extends DistributedTestCase{
     if (sender.isParallel()) {
       int totalSize = 0;
       final Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
-      
+
       WaitCriterion wc = new WaitCriterion() {
         int size = 0;
         public boolean done() {
@@ -3505,13 +3535,13 @@ public class WANTestBase extends DistributedTestCase{
           return " Expected local queue entries: " + numQueueEntries
             + " but actual entries: " + size;
         }
-        
+
       };
-      
+
       Wait.waitForCriterion(wc, 120000, 500, true);
     }
   }
-  
+
   /**
    * To be used only for ParallelGatewaySender.
    * @param senderId    Id of the ParallelGatewaySender
@@ -3535,12 +3565,12 @@ public class WANTestBase extends DistributedTestCase{
     }
     return -1;
   }
-  
+
   public static void doUpdates(String regionName, int numUpdates) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
     for (int i = 0; i < numUpdates; i++) {
-      String s = "K"+i; 
+      String s = "K"+i;
       r.put(i, s);
     }
   }
@@ -3554,7 +3584,7 @@ public class WANTestBase extends DistributedTestCase{
       r.put(key, s);
     }
   }
-  
+
   public static void doRandomUpdates(String regionName, int numUpdates) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3563,11 +3593,11 @@ public class WANTestBase extends DistributedTestCase{
       generatedKeys.add((new Random()).nextInt(r.size()));
     }
     for (Integer i: generatedKeys) {
-      String s = "K"+i; 
+      String s = "K"+i;
       r.put(i, s);
     }
   }
-  
+
   public static void doMultiThreadedPuts(String regionName, int numPuts) {
     final AtomicInteger ai = new AtomicInteger(-1);
     final ExecutorService execService = Executors.newFixedThreadPool(5,
@@ -3624,13 +3654,13 @@ public class WANTestBase extends DistributedTestCase{
               + " present region keyset " + r.keySet();
         }
       };
-      Wait.waitForCriterion(wc, 240000, 500, true);
+      Wait.waitForCriterion(wc, 30000, 500, true);
     } finally {
       exp.remove();
       exp1.remove();
     }
   }
-  
+
   /**
    * Validate whether all the attributes set on AsyncEventQueueFactory are set
    * on the sender underneath the AsyncEventQueue.
@@ -3662,7 +3692,7 @@ public class WANTestBase extends DistributedTestCase{
     assertEquals("batchConflation", batchConflationEnabled, theSender
         .isBatchConflationEnabled());
   }
-  
+
   public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
     AsyncEventListener theListener = null;
 
@@ -3690,7 +3720,7 @@ public class WANTestBase extends DistributedTestCase{
     };
     Wait.waitForCriterion(wc, 60000, 500, true); //TODO:Yogs 
   }
-  
+
    public static void validateCustomAsyncEventListener(String asyncQueueId,
       final int expectedSize) {
     AsyncEventListener theListener = null;
@@ -3718,14 +3748,14 @@ public class WANTestBase extends DistributedTestCase{
       }
     };
     Wait.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
-    
+
    Iterator<AsyncEvent> itr = eventsMap.values().iterator();
    while (itr.hasNext()) {
      AsyncEvent event = itr.next();
      assertTrue("possibleDuplicate should be true for event: " + event, event.getPossibleDuplicate());
    }
   }
-  
+
   public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
     AsyncEventQueue theAsyncEventQueue = null;
 
@@ -3795,7 +3825,7 @@ public class WANTestBase extends DistributedTestCase{
       Wait.waitForCriterion(wc, 60000, 500, true);
     }
   }
-  
+
   public static void verifyAsyncEventListenerForPossibleDuplicates(
       String asyncEventQueueId, Set<Integer> bucketIds, int batchSize) {
     AsyncEventListener theListener = null;
@@ -3840,10 +3870,10 @@ public class WANTestBase extends DistributedTestCase{
     LogWriterUtils.getLogWriter().info("The events map size is " + eventsMap.size());
     return eventsMap.size();
   }
-  
+
   public static int getAsyncEventQueueSize(String asyncEventQueueId) {
     AsyncEventQueue theQueue = null;
-    
+
     Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
     for (AsyncEventQueue asyncQueue : asyncEventQueues) {
       if (asyncEventQueueId.equals(asyncQueue.getId())) {
@@ -3854,7 +3884,7 @@ public class WANTestBase extends DistributedTestCase{
     return theQueue.size();
   }
 
-  
+
   public static void validateRegionSize_PDX(String regionName, final int regionSize) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3867,11 +3897,11 @@ public class WANTestBase extends DistributedTestCase{
       }
 
       public String description() {
-        
+
         return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet()  ;
       }
     };
-    Wait.waitForCriterion(wc, 200000, 500, true); 
+    Wait.waitForCriterion(wc, 200000, 500, true);
     for(int i = 0 ; i < regionSize; i++){
       LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i));
       assertEquals(new SimpleClass(i, (byte)i), r.get("Key_" + i));
@@ -3889,27 +3919,27 @@ public class WANTestBase extends DistributedTestCase{
       }
 
       public String description() {
-        
+
         return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet()  ;
       }
     };
-    Wait.waitForCriterion(wc, 200000, 500, true); 
+    Wait.waitForCriterion(wc, 200000, 500, true);
     for(int i = 0 ; i < regionSize; i++){
       LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i));
       assertEquals(new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i), r.get("Key_" + i));
     }
   }
-  
+
   public static void validateQueueSizeStat(String id, final int queueSize) {
     final AbstractGatewaySender sender = (AbstractGatewaySender)  cache.getGatewaySender(id);
-    
+
     Wait.waitForCriterion(new WaitCriterion() {
-      
+
       @Override
       public boolean done() {
         return sender.getEventQueueSize() == queueSize;
       }
-      
+
       @Override
       public String description() {
         // TODO Auto-generated method stub
@@ -3921,9 +3951,9 @@ public class WANTestBase extends DistributedTestCase{
   /**
    * This method is specifically written for pause and stop operations.
    * This method validates that the region size remains same for at least minimum number of verification
-   * attempts and also it remains below a specified limit value. This validation will suffice for 
-   * testing of pause/stop operations.  
-   *  
+   * attempts and also it remains below a specified limit value. This validation will suffice for
+   * testing of pause/stop operations.
+   *
    * @param regionName
    * @param regionSizeLimit
    */
@@ -3944,8 +3974,8 @@ public class WANTestBase extends DistributedTestCase{
           } else {
             return false;
           }
-          
-        } else { //current regionSize is not same as recorded previous regionSize 
+
+        } else { //current regionSize is not same as recorded previous regionSize
           previousSize = r.keySet().size(); //update the previousSize variable with current region size
           sameRegionSizeCounter = 0;//reset the sameRegionSizeCounter
           return false;
@@ -3956,21 +3986,21 @@ public class WANTestBase extends DistributedTestCase{
         return "Expected region size to remain same below a specified limit but actual region size does not remain same or exceeded the specified limit " + sameRegionSizeCounter + " :regionSize " + previousSize;
       }
     };
-    Wait.waitForCriterion(wc, 200000, 500, true); 
+    Wait.waitForCriterion(wc, 200000, 500, true);
   }
-  
+
   public static String getRegionFullPath(String regionName) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
     return r.getFullPath();
   }
-  
+
   public static Integer getRegionSize(String regionName) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
     return r.keySet().size();
   }
-  
+
   public static void validateRegionContents(String regionName, final Map keyValues) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -3991,9 +4021,9 @@ public class WANTestBase extends DistributedTestCase{
         return "Expected region entries doesn't match";
       }
     };
-    Wait.waitForCriterion(wc, 120000, 500, true); 
+    Wait.waitForCriterion(wc, 120000, 500, true);
   }
-  
+
   public static void CheckContent(String regionName, final int regionSize) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -4001,7 +4031,7 @@ public class WANTestBase extends DistributedTestCase{
       assertEquals(i, r.get(i));
     }
   }
-  
+
   public static void validateRegionContentsForPR(String regionName,
       final int regionSize) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
@@ -4018,14 +4048,14 @@ public class WANTestBase extends DistributedTestCase{
         return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size();
       }
     };
-    Wait.waitForCriterion(wc, 120000, 500, true); 
+    Wait.waitForCriterion(wc, 120000, 500, true);
   }
-  
+
   public static void verifyPrimaryStatus(final Boolean isPrimary) {
     final Set<GatewaySender> senders = cache.getGatewaySenders();
     assertEquals(senders.size(), 1);
     final AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
-    
+
     WaitCriterion wc = new WaitCriterion() {
       public boolean done() {
         if (sender.isPrimary() == isPrimary.booleanValue()) {
@@ -4040,7 +4070,7 @@ public class WANTestBase extends DistributedTestCase{
     };
     Wait.waitForCriterion(wc, 120000, 500, true);
   }
-  
+
   public static Boolean getPrimaryStatus(){
     Set<GatewaySender> senders = cache.getGatewaySenders();
     assertEquals(senders.size(), 1);
@@ -4060,12 +4090,12 @@ public class WANTestBase extends DistributedTestCase{
     Wait.waitForCriterion(wc, 10000, 500, false);
     return sender.isPrimary();
   }
-  
+
   public static Set<Integer> getAllPrimaryBucketsOnTheNode(String regionName) {
     PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
     return region.getDataStore().getAllLocalPrimaryBucketIds();
   }
-  
+
   public static void doHeavyPuts(String regionName, int numPuts) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
@@ -4075,7 +4105,7 @@ public class WANTestBase extends DistributedTestCase{
       r.put(i, new byte[1024*1024]);
     }
   }
-  
+
   public static void addListenerAndKillPrimary(){
     Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
     assertEquals(senders.size(), 1);
@@ -4084,7 +4114,7 @@ public class WANTestBase extends DistributedTestCase{
     assertNotNull(queue);
     CacheListenerAdapter cl = new CacheListenerAdapter() {
       public void afterCreate(EntryEvent event) {
-        if((Long)event.getKey() > 900){ 
+        if((Long)event.getKey() > 900){
           cache.getLogger().fine(" Gateway sender is killed by a test");
           cache.close();
           cache.getDistributedSystem().disconnect();
@@ -4093,28 +4123,28 @@ public class WANTestBase extends DistributedTestCase{
     };
     queue.getAttributesMutator().addCacheListener(cl);
   }
-  
+
   public static void addCacheListenerAndDestroyRegion(String regionName){
     final Region region = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(region);
     CacheListenerAdapter cl = new CacheListenerAdapter() {
       @Override
       public void afterCreate(EntryEvent event) {
-        if((Long)event.getKey() == 99){ 
+        if((Long)event.getKey() == 99){
           region.destroyRegion();
         }
       }
     };
     region.getAttributesMutator().addCacheListener(cl);
   }
-  
+
   public static void addCacheListenerAndCloseCache(String regionName){
     final Region region = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(region);
     CacheListenerAdapter cl = new CacheListenerAdapter() {
       @Override
       public void afterCreate(EntryEvent event) {
-        if((Long)event.getKey() == 900){ 
+        if((Long)event.getKey() == 900){
           cache.getLogger().fine(" Gateway sender is killed by a test");
           cache.close();
           cache.getDistributedSystem().disconnect();
@@ -4123,7 +4153,7 @@ public class WANTestBase extends DistributedTestCase{
     };
     region.getAttributesMutator().addCacheListener(cl);
   }
-  
+
   public static Boolean killSender(String senderId){
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     IgnoredException exp = IgnoredException.addIgnoredException(CacheClosedException.class
@@ -4149,9 +4179,9 @@ public class WANTestBase extends DistributedTestCase{
       exp.remove();
       exp1.remove();
       exln.remove();
-    }    
+    }
   }
-  
+
   public static Boolean killAsyncEventQueue(String asyncQueueId){
     Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
     AsyncEventQueueImpl queue = null;
@@ -4168,14 +4198,14 @@ public class WANTestBase extends DistributedTestCase{
     }
     return Boolean.FALSE;
   }
-  
+
   public static void killSender(){
-    LogWriterUtils.getLogWriter().info("Gateway sender is going to be killed by a test"); 
+    LogWriterUtils.getLogWriter().info("Gateway sender is going to be killed by a test");
     cache.close();
     cache.getDistributedSystem().disconnect();
     LogWriterUtils.getLogWriter().info("Gateway sender is killed by a test");
   }
-  
+
   static void waitForSitesToUpdate() {
     WaitCriterion wc = new WaitCriterion() {
       public boolean done() {
@@ -4187,21 +4217,21 @@ public class WANTestBase extends DistributedTestCase{
     };
     Wait.waitForCriterion(wc, 10000, 500, false);
   }
-  
-  public static void checkAllSiteMetaData(     
+
+  public static void checkAllSiteMetaData(
       Map<Integer, ArrayList<Integer>> dsVsPorts) {
     waitForSitesToUpdate();
     assertNotNull(getSystemStatic());
 //    Map<Integer,Set<DistributionLocatorId>> allSiteMetaData = ((DistributionConfigImpl)system
 //        .getConfig()).getAllServerLocatorsInfo();
-    
+
     List<Locator> locatorsConfigured = Locator.getLocators();
     Locator locator = locatorsConfigured.get(0);
     Map<Integer,Set<DistributionLocatorId>> allSiteMetaData = ((InternalLocator)locator).getlocatorMembershipListener().getAllLocatorsInfo();
     System.out.println("allSiteMetaData : " + allSiteMetaData);
     System.out.println("dsVsPorts : " + dsVsPorts);
     System.out.println("Server allSiteMetaData : " + ((InternalLocator)locator).getlocatorMembershipListener().getAllServerLocatorsInfo());
-    
+
     //assertEquals(dsVsPorts.size(), allSiteMetaData.size());
     for (Map.Entry<Integer, ArrayList<Integer>> entry : dsVsPorts.entrySet()) {
       Set<DistributionLocatorId> locators = allSiteMetaData.get(entry.getKey());
@@ -4219,9 +4249,9 @@ public class WANTestBase extends DistributedTestCase{
       }
     }
   }
-  
+
   public static Long checkAllSiteMetaDataFor3Sites(final Map<Integer, Set<String>> dsVsPort) {
-    
+
     WaitCriterion wc = new WaitCriterion() {
       public boolean done() {
         if (getSystemStatic() != null) {
@@ -4236,12 +4266,12 @@ public class WANTestBase extends DistributedTestCase{
         return "Making sure system is initialized";
       }
     };
-    Wait.waitForCriterion(wc, 50000, 1000, true); 
+    Wait.waitForCriterion(wc, 50000, 1000, true);
     assertNotNull(getSystemStatic());
-    
+
 //    final Map<Integer,Set<DistributionLocatorId>> allSiteMetaData = ((DistributionConfigImpl)system
 //        .getConfig()).getAllServerLocatorsInfo();
-    
+
     List<Locator> locatorsConfigured = Locator.getLocators();
     Locator locator = locatorsConfigured.get(0);
     LocatorMembershipListener listener = ((InternalLocator)locator).getlocatorMembershipListener();
@@ -4250,7 +4280,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     final Map<Integer,Set<DistributionLocatorId>> allSiteMetaData = listener.getAllLocatorsInfo();
     System.out.println("allSiteMetaData : " + allSiteMetaData);
-    
+
     wc = new WaitCriterion() {
       public boolean done() {
         if (dsVsPort.size() == allSiteMetaData.size()) {
@@ -4275,10 +4305,10 @@ public class WANTestBase extends DistributedTestCase{
             + " but actual meta data: " + allSiteMetaData;
       }
     };
-    Wait.waitForCriterion(wc, 300000, 500, true); 
+    Wait.waitForCriterion(wc, 300000, 500, true);
     return System.currentTimeMillis();
   }
-  
+
   public static void checkLocatorsinSender(String senderId, InetSocketAddress locatorToWaitFor)
       throws InterruptedException {
 
@@ -4290,17 +4320,17 @@ public class WANTestBase extends DistributedTestCase{
         break;
       }
     }
-    
+
     MyLocatorCallback callback = (MyLocatorCallbac

<TRUNCATED>