You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/04/19 20:19:34 UTC

[08/13] incubator-geode git commit: GEODE-1032 : Additional wait time to check for empty queue, refactored WANTestBase.java to remove unused functions, replaced wait criterions with awaitility.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index e93c9c2..aca2cb9 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -30,7 +30,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.Callable;
@@ -60,12 +59,10 @@ import com.gemstone.gemfire.cache.RegionAttributes;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
 import com.gemstone.gemfire.cache.RegionFactory;
 import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
 import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
 import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
 import com.gemstone.gemfire.cache.client.Pool;
 import com.gemstone.gemfire.cache.client.PoolManager;
 import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
@@ -154,8 +151,6 @@ public class WANTestBase extends DistributedTestCase{
   protected static QueueListener listener1;
   protected static QueueListener listener2;
 
-  protected static List<QueueListener> gatewayListeners;
-
   protected static AsyncEventListener eventListener1 ;
   protected static AsyncEventListener eventListener2 ;
 
@@ -163,8 +158,6 @@ public class WANTestBase extends DistributedTestCase{
 
   protected static GatewayEventFilter eventFilter;
 
-  protected static boolean destroyFlag = false;
-
   protected static List<Integer> dispatcherThreads =
 	  new ArrayList<Integer>(Arrays.asList(1, 3, 5));
   //this will be set for each test method run with one of the values from above list
@@ -315,7 +308,6 @@ public class WANTestBase extends DistributedTestCase{
     props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost");
     props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
     test.getSystem(props);
-    return;
   }
 
 
@@ -376,8 +368,6 @@ public class WANTestBase extends DistributedTestCase{
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
           String senderId = tokenizer.nextToken();
-          // GatewaySender sender = cache.getGatewaySender(senderId);
-          // assertNotNull(sender);
           fact.addGatewaySenderId(senderId);
         }
       }
@@ -400,8 +390,6 @@ public class WANTestBase extends DistributedTestCase{
       StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
       while (tokenizer.hasMoreTokens()){
         String senderId = tokenizer.nextToken();
-//        GatewaySender sender = cache.getGatewaySender(senderId);
-//        assertNotNull(sender);
         fact.addGatewaySenderId(senderId);
       }
     }
@@ -411,32 +399,12 @@ public class WANTestBase extends DistributedTestCase{
     assertNotNull(r);
   }
 
-//  public static void createReplicatedRegion_PDX(String regionName, String senderId, DataPolicy policy, InterestPolicy intPolicy){
-//    AttributesFactory fact = new AttributesFactory();
-//    if(senderId!= null){
-//      StringTokenizer tokenizer = new StringTokenizer(senderId, ",");
-//      while (tokenizer.hasMoreTokens()){
-//        String sender = tokenizer.nextToken();
-//        //fact.addSerialGatewaySenderId(sender);
-//      }
-//    }
-//    fact.setDataPolicy(policy);
-//    SubscriptionAttributes subAttr = new SubscriptionAttributes(intPolicy);
-//    fact.setSubscriptionAttributes(subAttr);
-//    fact.setScope(Scope.DISTRIBUTED_ACK);
-//    Region r = cache.createRegionFactory(fact.create()).create(regionName);
-//    assertNotNull(r);
-//    assertTrue(r.size() == 0);
-//  }
-
   public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
     AttributesFactory fact = new AttributesFactory();
     if(senderIds!= null){
       StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
       while (tokenizer.hasMoreTokens()){
         String senderId = tokenizer.nextToken();
-//        GatewaySender sender = cache.getGatewaySender(senderId);
-//        assertNotNull(sender);
         fact.addGatewaySenderId(senderId);
       }
     }
@@ -446,27 +414,6 @@ public class WANTestBase extends DistributedTestCase{
     assertNotNull(r);
   }
 
-//  public static void createReplicatedRegionWithParallelSenderId(String regionName, String senderId){
-//    AttributesFactory fact = new AttributesFactory();
-//    if(senderId!= null){
-//      StringTokenizer tokenizer = new StringTokenizer(senderId, ",");
-//      while (tokenizer.hasMoreTokens()){
-//        String sender = tokenizer.nextToken();
-//        //fact.addParallelGatewaySenderId(sender);
-//      }
-//    }
-//    fact.setDataPolicy(DataPolicy.REPLICATE);
-//    Region r = cache.createRegionFactory(fact.create()).create(regionName);
-//    assertNotNull(r);
-//  }
-
-//  public static void createReplicatedRegion(String regionName){
-//    AttributesFactory fact = new AttributesFactory();
-//    fact.setDataPolicy(DataPolicy.REPLICATE);
-//    Region r = cache.createRegionFactory(fact.create()).create(regionName);
-//    assertNotNull(r);
-//  }
-
   public static void createReplicatedRegionWithAsyncEventQueue(
       String regionName, String asyncQueueIds, Boolean offHeap) {
     IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -491,31 +438,11 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
 
-  public static void createPersistentReplicatedRegionWithAsyncEventQueue(
-      String regionName, String asyncQueueIds) {
-
-    AttributesFactory fact = new AttributesFactory();
-    if(asyncQueueIds != null){
-      StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
-      while (tokenizer.hasMoreTokens()){
-        String asyncQueueId = tokenizer.nextToken();
-        fact.addAsyncEventQueueId(asyncQueueId);
-      }
-    }
-    fact.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
-    RegionFactory regionFactory = cache.createRegionFactory(fact.create());
-    Region r = regionFactory.create(regionName);
-    assertNotNull(r);
-  }
-
-
-
   public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
       String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
         .getName());
     try {
-
       AttributesFactory fact = new AttributesFactory();
       if (senderIds != null) {
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
@@ -543,8 +470,6 @@ public class WANTestBase extends DistributedTestCase{
       StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
       while (tokenizer.hasMoreTokens()){
         String senderId = tokenizer.nextToken();
-//        GatewaySender sender = cache.getGatewaySender(senderId);
-//        assertNotNull(sender);
         fact.addGatewaySenderId(senderId);
       }
     }
@@ -567,7 +492,7 @@ public class WANTestBase extends DistributedTestCase{
       File[] dirs1 = new File[] { directory };
       DiskStoreFactory dsf = cache.createDiskStoreFactory();
       dsf.setDiskDirs(dirs1);
-      DiskStore ds = dsf.create(diskStoreName);
+      dsf.create(diskStoreName);
     }
 
     AsyncEventListener asyncEventListener = new MyAsyncEventListener();
@@ -582,305 +507,7 @@ public class WANTestBase extends DistributedTestCase{
     factory.setParallel(isParallel);
     //set dispatcher threads
     factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
-    AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
-  }
-
-  public static void createAsyncEventQueueWithListener2(String asyncChannelId,
-      boolean isParallel, Integer maxMemory, Integer batchSize,
-      boolean isPersistent, String diskStoreName) {
-
-    if (diskStoreName != null) {
-      File directory = new File(asyncChannelId + "_disk_"
-          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-      directory.mkdir();
-      File[] dirs1 = new File[] { directory };
-      DiskStoreFactory dsf = cache.createDiskStoreFactory();
-      dsf.setDiskDirs(dirs1);
-      DiskStore ds = dsf.create(diskStoreName);
-    }
-
-    AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
-
-    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-    factory.setBatchSize(batchSize);
-    factory.setPersistent(isPersistent);
-    factory.setDiskStoreName(diskStoreName);
-    factory.setMaximumQueueMemory(maxMemory);
-    factory.setParallel(isParallel);
-    //set dispatcher threads
-    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
-    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
-        asyncEventListener);
-  }
-
-  public static void createAsyncEventQueue(
-    String asyncChannelId, boolean isParallel, Integer maxMemory,
-    Integer batchSize, boolean isConflation, boolean isPersistent,
-    String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
-
-	if (diskStoreName != null) {
-	  File directory = new File(asyncChannelId + "_disk_"
-		+ System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-	  directory.mkdir();
-	  File[] dirs1 = new File[] { directory };
-	  DiskStoreFactory dsf = cache.createDiskStoreFactory();
-	  dsf.setDiskDirs(dirs1);
-	  DiskStore ds = dsf.create(diskStoreName);
-	}
-
-	String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
-	String className = packagePrefix + asyncListenerClass;
-	AsyncEventListener asyncEventListener = null;
-	try {
-		Class clazz = Class.forName(className);
-		asyncEventListener = (AsyncEventListener) clazz.newInstance();
-	} catch (ClassNotFoundException e) {
-	  throw e;
-	} catch (InstantiationException e) {
-	  throw e;
-	} catch (IllegalAccessException e) {
-	  throw e;
-	}
-
-	AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-	factory.setBatchSize(batchSize);
-	factory.setPersistent(isPersistent);
-	factory.setDiskStoreName(diskStoreName);
-	factory.setDiskSynchronous(isDiskSynchronous);
-	factory.setBatchConflationEnabled(isConflation);
-	factory.setMaximumQueueMemory(maxMemory);
-	factory.setParallel(isParallel);
-	//set dispatcher threads
-	factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
-	AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
-  }
-
-  public static void createAsyncEventQueueWithCustomListener(
-      String asyncChannelId, boolean isParallel, Integer maxMemory,
-      Integer batchSize, boolean isConflation, boolean isPersistent,
-      String diskStoreName, boolean isDiskSynchronous) {
-    createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, maxMemory, batchSize,
-        isConflation, isPersistent, diskStoreName, isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS);
-  }
-
-  public static void createAsyncEventQueueWithCustomListener(
-      String asyncChannelId, boolean isParallel, Integer maxMemory,
-      Integer batchSize, boolean isConflation, boolean isPersistent,
-      String diskStoreName, boolean isDiskSynchronous, int nDispatchers) {
-
-    IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
-        .getName());
-
-    try {
-      if (diskStoreName != null) {
-        File directory = new File(asyncChannelId + "_disk_"
-            + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-        directory.mkdir();
-        File[] dirs1 = new File[] { directory };
-        DiskStoreFactory dsf = cache.createDiskStoreFactory();
-        dsf.setDiskDirs(dirs1);
-        DiskStore ds = dsf.create(diskStoreName);
-      }
-
-      AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
-
-      AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-      factory.setBatchSize(batchSize);
-      factory.setPersistent(isPersistent);
-      factory.setDiskStoreName(diskStoreName);
-      factory.setMaximumQueueMemory(maxMemory);
-      factory.setParallel(isParallel);
-      factory.setDispatcherThreads(nDispatchers);
-      AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
-          asyncEventListener);
-    } finally {
-      exp.remove();
-    }
-  }
-
-  public static void createConcurrentAsyncEventQueue(
-      String asyncChannelId, boolean isParallel,
-      Integer maxMemory, Integer batchSize, boolean isConflation,
-      boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
-      int dispatcherThreads, OrderPolicy policy) {
-
-    if (diskStoreName != null) {
-      File directory = new File(asyncChannelId + "_disk_"
-          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-      directory.mkdir();
-      File[] dirs1 = new File[] { directory };
-      DiskStoreFactory dsf = cache.createDiskStoreFactory();
-      dsf.setDiskDirs(dirs1);
-      DiskStore ds = dsf.create(diskStoreName);
-    }
-
-    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-
-    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-    factory.setBatchSize(batchSize);
-    factory.setPersistent(isPersistent);
-    factory.setDiskStoreName(diskStoreName);
-    factory.setDiskSynchronous(isDiskSynchronous);
-    factory.setBatchConflationEnabled(isConflation);
-    factory.setMaximumQueueMemory(maxMemory);
-    factory.setParallel(isParallel);
-    factory.setDispatcherThreads(dispatcherThreads);
-    factory.setOrderPolicy(policy);
-    AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
-  }
-
-
-  public static String createAsyncEventQueueWithDiskStore(
-      String asyncChannelId, boolean isParallel,
-      Integer maxMemory, Integer batchSize,
-      boolean isPersistent, String diskStoreName) {
-
-    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-
-    File persistentDirectory = null;
-    if (diskStoreName == null) {
-      persistentDirectory = new File(asyncChannelId + "_disk_"
-          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
-    } else {
-      persistentDirectory = new File(diskStoreName);
-    }
-    LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName());
-    persistentDirectory.mkdir();
-    DiskStoreFactory dsf = cache.createDiskStoreFactory();
-    File [] dirs1 = new File[] {persistentDirectory};
-
-    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
-    factory.setBatchSize(batchSize);
-    factory.setParallel(isParallel);
-    if (isPersistent) {
-      factory.setPersistent(isPersistent);
-      factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId).getName());
-    }
-    factory.setMaximumQueueMemory(maxMemory);
-    //set dispatcher threads
-    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
-    AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
-    return persistentDirectory.getName();
-  }
-
-  public static void pauseAsyncEventQueue(String asyncChannelId) {
-    AsyncEventQueue theChannel = null;
-
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncChannelId.equals(asyncChannel.getId())) {
-        theChannel = asyncChannel;
-      }
-    }
-
-    ((AsyncEventQueueImpl)theChannel).getSender().pause();
- }
-
-  public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(String asyncChannelId) {
-    AsyncEventQueue theChannel = null;
-
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncChannelId.equals(asyncChannel.getId())) {
-        theChannel = asyncChannel;
-        break;
-      }
-    }
-
-    ((AsyncEventQueueImpl)theChannel).getSender().pause();
-
-
-    ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender()).getEventProcessor().waitForDispatcherToPause();
-  }
-
- public static void resumeAsyncEventQueue(String asyncQueueId) {
-    AsyncEventQueue theQueue = null;
-
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncQueueId.equals(asyncChannel.getId())) {
-        theQueue = asyncChannel;
-      }
-    }
-
-    ((AsyncEventQueueImpl)theQueue).getSender().resume();
-  }
-
-
-  public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
-    AsyncEventQueue theAsyncEventQueue = null;
-
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncQueueId.equals(asyncChannel.getId())) {
-        theAsyncEventQueue = asyncChannel;
-      }
-    }
-
-    GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue).getSender();
-
-    if (sender.isParallel()) {
-      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
-      assertEquals(numQueueEntries,
-          queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size());
-    } else {
-      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
-      int size = 0;
-      for (RegionQueue q : queues) {
-        size += q.size();
-      }
-      assertEquals(numQueueEntries, size);
-    }
-  }
-
-  /**
-   * This method verifies the queue size of a ParallelGatewaySender. For
-   * ParallelGatewaySender conflation happens in a separate thread, hence test
-   * code needs to wait for some time for expected result
-   *
-   * @param asyncQueueId
-   *          Async Queue ID
-   * @param numQueueEntries
-   *          expected number of Queue entries
-   * @throws Exception
-   */
-  public static void waitForAsyncEventQueueSize(String asyncQueueId,
-      final int numQueueEntries) throws Exception {
-    AsyncEventQueue theAsyncEventQueue = null;
-
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncQueueId.equals(asyncChannel.getId())) {
-        theAsyncEventQueue = asyncChannel;
-      }
-    }
-
-    GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue)
-        .getSender();
-
-    if (sender.isParallel()) {
-      final Set<RegionQueue> queues = ((AbstractGatewaySender) sender)
-          .getQueues();
-
-      Wait.waitForCriterion(new WaitCriterion() {
-
-        public String description() {
-          return "Waiting for EventQueue size to be " + numQueueEntries;
-        }
-
-        public boolean done() {
-          boolean done = numQueueEntries == queues
-              .toArray(new RegionQueue[queues.size()])[0].getRegion().size();
-          return done;
-        }
-
-      }, MAX_WAIT, 500, true);
-
-    } else {
-      throw new Exception(
-          "This method should be used for only ParallelGatewaySender,SerialGatewaySender should use checkAsyncEventQueueSize() method instead");
-
-    }
+    factory.create(asyncChannelId, asyncEventListener);
   }
 
   public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
@@ -894,8 +521,6 @@ public class WANTestBase extends DistributedTestCase{
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
           String senderId = tokenizer.nextToken();
-          // GatewaySender sender = cache.getGatewaySender(senderId);
-          // assertNotNull(sender);
           fact.addGatewaySenderId(senderId);
         }
       }
@@ -927,8 +552,6 @@ public class WANTestBase extends DistributedTestCase{
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
           String senderId = tokenizer.nextToken();
-          // GatewaySender sender = cache.getGatewaySender(senderId);
-          // assertNotNull(sender);
           fact.addGatewaySenderId(senderId);
         }
       }
@@ -958,8 +581,6 @@ public class WANTestBase extends DistributedTestCase{
 		  StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
 		  while (tokenizer.hasMoreTokens()) {
 			  String senderId = tokenizer.nextToken();
-	          // GatewaySender sender = cache.getGatewaySender(senderId);
-	          // assertNotNull(sender);
 			  fact.addGatewaySenderId(senderId);
 		  }
 	  }
@@ -994,111 +615,6 @@ public class WANTestBase extends DistributedTestCase{
     mutator.addAsyncEventQueueId(queueId);
   }
 
-  public static void createPartitionedRegionWithAsyncEventQueue(
-      String regionName, String asyncEventQueueId, Boolean offHeap) {
-    IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
-        .getName());
-    IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
-        .getName());
-    try {
-      AttributesFactory fact = new AttributesFactory();
-
-      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
-      pfact.setTotalNumBuckets(16);
-      fact.setPartitionAttributes(pfact.create());
-      fact.setOffHeap(offHeap);
-      Region r = cache.createRegionFactory(fact.create())
-          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
-      assertNotNull(r);
-    }
-    finally {
-      exp.remove();
-      exp1.remove();
-    }
-  }
-
-  public static void createColocatedPartitionedRegionWithAsyncEventQueue(
-    String regionName, String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) {
-
-	IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
-	  .getName());
-	IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
-	  .getName());
-	try {
-	  AttributesFactory fact = new AttributesFactory();
-
-	  PartitionAttributesFactory pfact = new PartitionAttributesFactory();
-	  pfact.setTotalNumBuckets(totalNumBuckets);
-	  pfact.setColocatedWith(colocatedWith);
-	  fact.setPartitionAttributes(pfact.create());
-	  Region r = cache.createRegionFactory(fact.create())
-	    .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
-	  assertNotNull(r);
-	}
-	finally {
-	  exp.remove();
-	  exp1.remove();
-	}
-  }
-
-  public static void createPersistentPartitionedRegionWithAsyncEventQueue(
-      String regionName, String asyncEventQueueId) {
-    AttributesFactory fact = new AttributesFactory();
-
-    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
-    fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
-    pfact.setTotalNumBuckets(16);
-    fact.setPartitionAttributes(pfact.create());
-    if (asyncEventQueueId != null) {
-      StringTokenizer tokenizer = new StringTokenizer(asyncEventQueueId, ",");
-      while (tokenizer.hasMoreTokens()) {
-        String asyncId = tokenizer.nextToken();
-        fact.addAsyncEventQueueId(asyncId);
-      }
-    }
-    Region r = cache.createRegionFactory(fact.create()).create(regionName);
-    assertNotNull(r);
-  }
-
-  /**
-   * Create PartitionedRegion with 1 redundant copy
-   */
-  public static void createPRWithRedundantCopyWithAsyncEventQueue(
-      String regionName, String asyncEventQueueId, Boolean offHeap) {
-    IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
-        .getName());
-
-    try {
-      AttributesFactory fact = new AttributesFactory();
-
-      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
-      pfact.setTotalNumBuckets(16);
-      pfact.setRedundantCopies(1);
-      fact.setPartitionAttributes(pfact.create());
-      fact.setOffHeap(offHeap);
-      Region r = cache.createRegionFactory(fact.create())
-          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
-      assertNotNull(r);
-    }
-    finally {
-      exp.remove();
-    }
-  }
-
-  public static void createPartitionedRegionAccessorWithAsyncEventQueue(
-      String regionName, String asyncEventQueueId) {
-    AttributesFactory fact = new AttributesFactory();
-    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
-    pfact.setTotalNumBuckets(16);
-    pfact.setLocalMaxMemory(0);
-    fact.setPartitionAttributes(pfact.create());
-    Region r = cache.createRegionFactory(
-    fact.create()).addAsyncEventQueueId(
-    asyncEventQueueId).create(regionName);
-    //fact.create()).create(regionName);
-    assertNotNull(r);
-  }
-
   public static void createPartitionedRegionAsAccessor(
       String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
     AttributesFactory fact = new AttributesFactory();
@@ -1106,8 +622,6 @@ public class WANTestBase extends DistributedTestCase{
       StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
       while (tokenizer.hasMoreTokens()){
         String senderId = tokenizer.nextToken();
-//        GatewaySender sender = cache.getGatewaySender(senderId);
-//        assertNotNull(sender);
         fact.addGatewaySenderId(senderId);
       }
     }
@@ -1126,8 +640,6 @@ public class WANTestBase extends DistributedTestCase{
       StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ",");
       while (tokenizer.hasMoreTokens()) {
         String senderId = tokenizer.nextToken();
-        // GatewaySender sender = cache.getGatewaySender(senderId);
-        // assertNotNull(sender);
         fact.addGatewaySenderId(senderId);
       }
     }
@@ -1135,8 +647,6 @@ public class WANTestBase extends DistributedTestCase{
       StringTokenizer tokenizer = new StringTokenizer(parallelSenderIds, ",");
       while (tokenizer.hasMoreTokens()) {
         String senderId = tokenizer.nextToken();
-//        GatewaySender sender = cache.getGatewaySender(senderId);
-//        assertNotNull(sender);
         fact.addGatewaySenderId(senderId);
       }
     }
@@ -1166,8 +676,6 @@ public class WANTestBase extends DistributedTestCase{
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
           String senderId = tokenizer.nextToken();
-          // GatewaySender sender = cache.getGatewaySender(senderId);
-          // assertNotNull(sender);
           fact.addGatewaySenderId(senderId);
         }
       }
@@ -1187,7 +695,7 @@ public class WANTestBase extends DistributedTestCase{
   }
 
   public static void createCustomerOrderShipmentPartitionedRegion(
-      String regionName, String senderIds, Integer redundantCopies,
+      String senderIds, Integer redundantCopies,
       Integer totalNumBuckets, Boolean offHeap) {
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
         .getName());
@@ -1197,15 +705,11 @@ public class WANTestBase extends DistributedTestCase{
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
           String senderId = tokenizer.nextToken();
-          // GatewaySender sender = cache.getGatewaySender(senderId);
-          // assertNotNull(sender);
           fact.addGatewaySenderId(senderId);
         }
       }
 
       PartitionAttributesFactory paf = new PartitionAttributesFactory();
-      // creating colocated Regions
-      paf = new PartitionAttributesFactory();
       paf.setRedundantCopies(redundantCopies)
           .setTotalNumBuckets(totalNumBuckets)
           .setPartitionResolver(
@@ -1230,8 +734,6 @@ public class WANTestBase extends DistributedTestCase{
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
           String senderId = tokenizer.nextToken();
-          // GatewaySender sender = cache.getGatewaySender(senderId);
-          // assertNotNull(sender);
           fact.addGatewaySenderId(senderId);
         }
       }
@@ -1255,8 +757,6 @@ public class WANTestBase extends DistributedTestCase{
         StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
         while (tokenizer.hasMoreTokens()) {
           String senderId = tokenizer.nextToken();
-          // GatewaySender sender = cache.getGatewaySender(senderId);
-          // assertNotNull(sender);
           fact.addGatewaySenderId(senderId);
         }
       }
@@ -1280,8 +780,6 @@ public class WANTestBase extends DistributedTestCase{
       StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
       while (tokenizer.hasMoreTokens()){
         String senderId = tokenizer.nextToken();
-//        GatewaySender sender = cache.getGatewaySender(senderId);
-//        assertNotNull(sender);
         fact.addGatewaySenderId(senderId);
       }
     }
@@ -1309,8 +807,6 @@ public class WANTestBase extends DistributedTestCase{
       StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
       while (tokenizer.hasMoreTokens()){
         String senderId = tokenizer.nextToken();
-//        GatewaySender sender = cache.getGatewaySender(senderId);
-//        assertNotNull(sender);
         fact.addGatewaySenderId(senderId);
       }
     }
@@ -1340,21 +836,6 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
 
-  public static void createCacheInVMsAsync(Integer locatorPort, VM... vms) {
-    List<AsyncInvocation> tasks = new LinkedList<>();
-    for (VM vm : vms) {
-      tasks.add(vm.invokeAsync(() -> createCache(locatorPort)));
-    }
-    for (AsyncInvocation invocation : tasks) {
-      try {
-        invocation.join(60000);
-      }
-      catch (InterruptedException e) {
-        fail("Failed starting up the cache");
-      }
-    }
-  }
-
   public static void addListenerToSleepAfterCreateEvent(int milliSeconds) {
     cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
       .addCacheListener(new CacheListenerAdapter<Object, Object>() {
@@ -1370,6 +851,31 @@ public class WANTestBase extends DistributedTestCase{
       });
   }
 
+  private static CacheListener myListener;
+  public static void longPauseAfterNumEvents(int numEvents, int milliSeconds) {
+    myListener = new CacheListenerAdapter<Object, Object>() {
+      @Override
+      public void afterCreate(final EntryEvent<Object, Object> event) {
+        try {
+          if (event.getRegion().size() >= numEvents){
+            Thread.sleep(milliSeconds);
+          }
+        }
+        catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    };
+    cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
+      .addCacheListener(myListener);
+  }
+
+  public static void removeCacheListener() {
+    cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
+      .removeCacheListener(myListener);
+
+  }
+
 
   public static void createCache(Integer locPort){
     createCache(false, locPort);
@@ -1449,7 +955,7 @@ public class WANTestBase extends DistributedTestCase{
     File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
     DiskStoreFactory dsf = cache.createDiskStoreFactory();
     File [] dirs1 = new File[] {pdxDir};
-    DiskStore store = dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
+    dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
   }
 
   public static void createCache(Integer locPort1, Integer locPort2){
@@ -1576,19 +1082,6 @@ public class WANTestBase extends DistributedTestCase{
     sender.test_setBatchConflationEnabled(true);
   }
 
-  public static void startAsyncEventQueue(String senderId) {
-    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
-    AsyncEventQueue q = null;
-    for (AsyncEventQueue s : queues) {
-      if (s.getId().equals(senderId)) {
-        q = s;
-        break;
-      }
-    }
-    //merge42180: There is no start method on AsyncEventQueue. Cheetah has this method. Yet the code for AsyncEvnt Queue is not properly merged from cheetah to cedar
-    //q.start();
-  }
-
   public static Map getSenderToReceiverConnectionInfo(String senderId){
 	  Set<GatewaySender> senders = cache.getGatewaySenders();
 	  GatewaySender sender = null;
@@ -1622,25 +1115,13 @@ public class WANTestBase extends DistributedTestCase{
         break;
       }
     }
-    final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+    final GatewaySenderStats statistics = sender.getStatistics();
     if (expectedQueueSize != -1) {
       final RegionQueue regionQueue;
       regionQueue = sender.getQueues().toArray(
           new RegionQueue[1])[0];
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          if (regionQueue.size() == expectedQueueSize) {
-            return true;
-          }
-          return false;
-        }
-
-        public String description() {
-          return "Expected queue entries: " + expectedQueueSize
-              + " but actual entries: " + regionQueue.size();
-        }
-      };
-      Wait.waitForCriterion(wc, 120000, 500, true);
+      Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> assertEquals("Expected queue entries: " +
+        expectedQueueSize + " but actual entries: " + regionQueue.size(), expectedQueueSize,regionQueue.size()));
     }
     ArrayList<Integer> stats = new ArrayList<Integer>();
     stats.add(statistics.getEventQueueSize());
@@ -1674,28 +1155,10 @@ public class WANTestBase extends DistributedTestCase{
     assert(statistics.getEventsDistributed() >= eventsDistributed);
   }
 
-  public static void checkAsyncEventQueueStats(String queueId, final int queueSize,
-      final int eventsReceived, final int eventsQueued,
-      final int eventsDistributed) {
-    Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
-    AsyncEventQueue queue = null;
-    for (AsyncEventQueue q : asyncQueues) {
-      if (q.getId().equals(queueId)) {
-        queue = q;
-        break;
-      }
-    }
-    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
-    assertEquals(queueSize, statistics.getEventQueueSize());
-    assertEquals(eventsReceived, statistics.getEventsReceived());
-    assertEquals(eventsQueued, statistics.getEventsQueued());
-    assert(statistics.getEventsDistributed() >= eventsDistributed);
-  }
-
   public static void checkGatewayReceiverStats(int processBatches,
       int eventsReceived, int creates) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
-    GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
     CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
         .getAcceptor().getStats();
 
@@ -1709,7 +1172,7 @@ public class WANTestBase extends DistributedTestCase{
   public static void checkMinimumGatewayReceiverStats(int processBatches,
       int eventsReceived) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
-    GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
     CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
         .getAcceptor().getStats();
 
@@ -1721,7 +1184,7 @@ public class WANTestBase extends DistributedTestCase{
 
   public static void checkExcepitonStats(int exceptionsOccured) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
-    GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
     CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
         .getAcceptor().getStats();
 
@@ -1739,7 +1202,7 @@ public class WANTestBase extends DistributedTestCase{
   public static void checkGatewayReceiverStatsHA(int processBatches,
       int eventsReceived, int creates) {
     Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
-    GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
     CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
         .getAcceptor().getStats();
 
@@ -1776,21 +1239,6 @@ public class WANTestBase extends DistributedTestCase{
     assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
   }
 
-  public static void checkAsyncEventQueueConflatedStats(
-      String asyncEventQueueId, final int eventsConflated) {
-    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
-    AsyncEventQueue queue = null;
-    for (AsyncEventQueue q : queues) {
-      if (q.getId().equals(asyncEventQueueId)) {
-        queue = q;
-        break;
-      }
-    }
-    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
-        .getStatistics();
-    assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
-  }
-
   public static void checkStats_Failover(String senderId,
       final int eventsReceived) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
@@ -1810,25 +1258,6 @@ public class WANTestBase extends DistributedTestCase{
         .getUnprocessedEventsRemovedByPrimary()));
   }
 
-  public static void checkAsyncEventQueueStats_Failover(String asyncEventQueueId,
-      final int eventsReceived) {
-    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
-    AsyncEventQueue queue = null;
-    for (AsyncEventQueue q : asyncEventQueues) {
-      if (q.getId().equals(asyncEventQueueId)) {
-        queue = q;
-        break;
-      }
-    }
-    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue)
-        .getStatistics();
-
-    assertEquals(eventsReceived, statistics.getEventsReceived());
-    assertEquals(eventsReceived, (statistics.getEventsQueued()
-        + statistics.getUnprocessedTokensAddedByPrimary() + statistics
-        .getUnprocessedEventsRemovedByPrimary()));
-  }
-
   public static void checkBatchStats(String senderId, final int batches) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     GatewaySender sender = null;
@@ -1844,22 +1273,6 @@ public class WANTestBase extends DistributedTestCase{
     assertEquals(0, statistics.getBatchesRedistributed());
   }
 
-  public static void checkAsyncEventQueueBatchStats(String asyncQueueId,
-      final int batches) {
-    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
-    AsyncEventQueue queue = null;
-    for (AsyncEventQueue q : queues) {
-      if (q.getId().equals(asyncQueueId)) {
-        queue = q;
-        break;
-      }
-    }
-    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
-        .getStatistics();
-    assert (statistics.getBatchesDistributed() >= batches);
-    assertEquals(0, statistics.getBatchesRedistributed());
-  }
-
   public static void checkBatchStats(String senderId,
       final boolean batchesDistributed, final boolean bathcesRedistributed) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
@@ -1896,43 +1309,13 @@ public class WANTestBase extends DistributedTestCase{
             .getUnprocessedTokensAddedByPrimary()));
   }
 
-  public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, int events) {
-    Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
-    AsyncEventQueue queue = null;
-    for (AsyncEventQueue q : asyncQueues) {
-      if (q.getId().equals(asyncQueueId)) {
-        queue = q;
-        break;
-      }
-    }
-    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
-    assertEquals(events,
-        (statistics.getUnprocessedEventsAddedBySecondary() + statistics
-            .getUnprocessedTokensRemovedBySecondary()));
-    assertEquals(events,
-        (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
-            .getUnprocessedTokensAddedByPrimary()));
-  }
-
   public static void waitForSenderRunningState(String senderId){
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     try {
       Set<GatewaySender> senders = cache.getGatewaySenders();
       final GatewaySender sender = getGatewaySenderById(senders, senderId);
-
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          if (sender != null && sender.isRunning()) {
-            return true;
-          }
-          return false;
-        }
-
-        public String description() {
-          return "Expected sender isRunning state to be true but is false";
-        }
-      };
-      Wait.waitForCriterion(wc, 300000, 500, true);
+      Awaitility.await().atMost(300,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender isRunning state to "
+        + "be true but is false", true, (sender != null && sender.isRunning())));
     } finally {
       exln.remove();
     }
@@ -1941,19 +1324,8 @@ public class WANTestBase extends DistributedTestCase{
   public static void waitForSenderToBecomePrimary(String senderId){
     Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
     final GatewaySender sender = getGatewaySenderById(senders, senderId);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (sender != null && ((AbstractGatewaySender) sender).isPrimary()) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Expected sender primary state to be true but is false";
-      }
-    };
-    Wait.waitForCriterion(wc, 10000, 1000, true);
+    Awaitility.await().atMost(10,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender primary state to "
+      + "be true but is false", true, (sender != null && ((AbstractGatewaySender)sender).isPrimary())));
   }
 
   private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
@@ -1980,22 +1352,13 @@ public class WANTestBase extends DistributedTestCase{
     secondaryUpdatesMap.put("Update", listener1.updateList);
     secondaryUpdatesMap.put("Destroy", listener1.destroyList);
 
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        secondaryUpdatesMap.put("Create", listener1.createList);
-        secondaryUpdatesMap.put("Update", listener1.updateList);
-        secondaryUpdatesMap.put("Destroy", listener1.destroyList);
-        if (secondaryUpdatesMap.equals(primaryUpdatesMap)) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap;
-      }
-    };
-    Wait.waitForCriterion(wc, 300000, 500, true);
+    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+      secondaryUpdatesMap.put("Create", listener1.createList);
+      secondaryUpdatesMap.put("Update", listener1.updateList);
+      secondaryUpdatesMap.put("Destroy", listener1.destroyList);
+      assertEquals("Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap,
+        true,secondaryUpdatesMap.equals(primaryUpdatesMap));
+    });
   }
 
   public static HashMap checkQueue2(){
@@ -2021,35 +1384,12 @@ public class WANTestBase extends DistributedTestCase{
     PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
     HashMap listenerAttrs = new HashMap();
     for (int i = 0; i < numBuckets; i++) {
-      BucketRegion br = region.getBucketRegion(i);
-      QueueListener listener = (QueueListener)br.getCacheListener();
-      listenerAttrs.put("Create"+i, listener.createList);
-      listenerAttrs.put("Update"+i, listener.updateList);
-      listenerAttrs.put("Destroy"+i, listener.destroyList);
-    }
-    return listenerAttrs;
-  }
-
-  public static HashMap checkQueue_PR(String senderId){
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    GatewaySender sender = null;
-    for(GatewaySender s : senders){
-      if(s.getId().equals(senderId)){
-        sender = s;
-        break;
-      }
-    }
-
-    RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
-    .getQueues().toArray(new RegionQueue[1])[0];
-
-    PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
-    QueueListener listener = (QueueListener)region.getCacheListener();
-
-    HashMap listenerAttrs = new HashMap();
-    listenerAttrs.put("Create", listener.createList);
-    listenerAttrs.put("Update", listener.updateList);
-    listenerAttrs.put("Destroy", listener.destroyList);
+      BucketRegion br = region.getBucketRegion(i);
+      QueueListener listener = (QueueListener)br.getCacheListener();
+      listenerAttrs.put("Create"+i, listener.createList);
+      listenerAttrs.put("Update"+i, listener.updateList);
+      listenerAttrs.put("Destroy"+i, listener.destroyList);
+    }
     return listenerAttrs;
   }
 
@@ -2062,7 +1402,7 @@ public class WANTestBase extends DistributedTestCase{
         break;
       }
     }
-    RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+    RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
     .getQueues().toArray(new RegionQueue[1])[0];
 
     PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
@@ -2110,7 +1450,7 @@ public class WANTestBase extends DistributedTestCase{
         break;
       }
     }
-    RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+    RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
     .getQueues().toArray(new RegionQueue[1])[0];
 
     PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
@@ -2163,7 +1503,7 @@ public class WANTestBase extends DistributedTestCase{
       }
     }
     else {
-      RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+      RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
       .getQueues().toArray(new RegionQueue[1])[0];
       parallelQueue.addCacheListener(listener1);
     }
@@ -2186,7 +1526,7 @@ public class WANTestBase extends DistributedTestCase{
       }
     }
     else {
-    	RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+    	RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
       .getQueues().toArray(new RegionQueue[1])[0];
       parallelQueue.addCacheListener(listener2);
     }
@@ -2215,27 +1555,6 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
 
-  public static void pauseSenderAndWaitForDispatcherToPause(String senderId) {
-    final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
-    IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
-        .getName());
-    try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
-      sender.pause();
-      ((AbstractGatewaySender)sender).getEventProcessor().waitForDispatcherToPause();
-    } finally {
-      exp.remove();
-      exln.remove();
-    }
-  }
-
   public static void resumeSender(String senderId) {
     final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -2310,7 +1629,7 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
 
-  public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, int remoteDsId,
+  public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName,
       boolean isParallel, Integer maxMemory,
       Integer batchSize, boolean isConflation, boolean isPersistent,
       GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) {
@@ -2323,7 +1642,7 @@ public class WANTestBase extends DistributedTestCase{
     gateway.setManualStart(isManualStart);
     gateway.setDispatcherThreads(numDispatchers);
     gateway.setOrderPolicy(policy);
-    ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+    gateway.setLocatorDiscoveryCallback(new MyLocatorCallback());
     if (filter != null) {
       eventFilter = filter;
       gateway.addGatewayEventFilter(filter);
@@ -2350,7 +1669,7 @@ public class WANTestBase extends DistributedTestCase{
       persistentDirectory.mkdir();
       DiskStoreFactory dsf = cache.createDiskStoreFactory();
       File[] dirs1 = new File[] { persistentDirectory };
-      GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
+      GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
       gateway.create(dsName, remoteDsId);
 
     } finally {
@@ -2368,7 +1687,7 @@ public class WANTestBase extends DistributedTestCase{
       persistentDirectory.mkdir();
       DiskStoreFactory dsf = cache.createDiskStoreFactory();
       File[] dirs1 = new File[] { persistentDirectory };
-      GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter,
+      GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName,isParallel, maxMemory, batchSize, isConflation, isPersistent, filter,
           isManualStart, numDispatchers, orderPolicy);
       gateway.create(dsName, remoteDsId);
 
@@ -2377,10 +1696,8 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
 
-  public static void createSenderWithoutDiskStore(String dsName, int remoteDsId,
-      boolean isParallel, Integer maxMemory,
-      Integer batchSize, boolean isConflation, boolean isPersistent,
-      GatewayEventFilter filter, boolean isManulaStart) {
+  public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isManulaStart) {
 
       GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
       gateway.setParallel(true);
@@ -2403,53 +1720,10 @@ public class WANTestBase extends DistributedTestCase{
     persistentDirectory.mkdir();
     DiskStoreFactory dsf = cache.createDiskStoreFactory();
     File[] dirs1 = new File[] { persistentDirectory };
-    GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
+    GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
     gateway.create(dsName, remoteDsId);
   }
 
-//  public static void createSender_PDX(String dsName, int remoteDsId,
-//      boolean isParallel, Integer maxMemory,
-//      Integer batchSize, boolean isConflation, boolean isPersistent,
-//      GatewayEventFilter filter, boolean isManulaStart) {
-//    File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
-//    persistentDirectory.mkdir();
-//
-//    File [] dirs1 = new File[] {persistentDirectory};
-//
-//    if(isParallel) {
-//      ParallelGatewaySenderFactory gateway = cache.createParallelGatewaySenderFactory();
-//      gateway.setMaximumQueueMemory(maxMemory);
-//      gateway.setBatchSize(batchSize);
-//      ((ParallelGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
-//      if (filter != null) {
-//        gateway.addGatewayEventFilter(filter);
-//      }
-//      if(isPersistent) {
-//        gateway.setPersistenceEnabled(true);
-//        DiskStoreFactory dsf = cache.createDiskStoreFactory();
-//        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
-//      }
-//      gateway.setBatchConflationEnabled(isConflation);
-//      gateway.create(dsName, remoteDsId);
-//
-//    }else {
-//      SerialGatewaySenderFactory gateway = cache.createSerialGatewaySenderFactory();
-//      gateway.setMaximumQueueMemory(maxMemory);
-//      gateway.setBatchSize(batchSize);
-//      gateway.setManualStart(isManulaStart);
-//      ((SerialGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
-//      if (filter != null) {
-//        gateway.addGatewayEventFilter(filter);
-//      }
-//      gateway.setBatchConflationEnabled(isConflation);
-//      if(isPersistent) {
-//        gateway.setPersistenceEnabled(true);
-//        DiskStoreFactory dsf = cache.createDiskStoreFactory();
-//        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
-//      }
-//      gateway.create(dsName, remoteDsId);
-//    }
-//  }
   public static void createSenderForValidations(String dsName, int remoteDsId,
       boolean isParallel, Integer alertThreshold,
       boolean isConflation, boolean isPersistent,
@@ -2527,8 +1801,7 @@ public class WANTestBase extends DistributedTestCase{
           gateway.setDiskStoreName(store.getName());
         }
         gateway.setDiskSynchronous(isDiskSync);
-        GatewaySender sender = gateway
-            .create(dsName, remoteDsId);
+        gateway.create(dsName, remoteDsId);
       }
     }
     finally {
@@ -2675,41 +1948,13 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
 
-  public static void pauseWaitCriteria(final long millisec) {
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        return false;
-      }
-
-      public String description() {
-        return "Expected to wait for " + millisec + " millisec.";
-      }
-    };
-    Wait.waitForCriterion(wc, millisec, 500, false);
-  }
-
-  public static void createReceiverInVMs(int locatorPort, VM... vms) {
-    for (VM vm : vms) {
-      vm.invoke(() -> createReceiver(locatorPort));
-    }
-  }
-
-  public static void createReceiverInVMsAsync(int locatorPort, VM... vms) {
-    List<AsyncInvocation> tasks = new LinkedList<>();
+  public static void createReceiverInVMs(VM... vms) {
     for (VM vm : vms) {
-      tasks.add(vm.invokeAsync(() -> createReceiver(locatorPort)));
-    }
-    for (AsyncInvocation invocation : tasks) {
-      try {
-        invocation.join(30000);
-      }
-      catch (InterruptedException e) {
-        fail("Failed starting up the receiver");
-      }
+      vm.invoke(() -> createReceiver());
     }
   }
 
-  public static int createReceiver(int locPort) {
+  public static int createReceiver() {
     GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
     int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
     fact.setStartPort(port);
@@ -2805,15 +2050,6 @@ public class WANTestBase extends DistributedTestCase{
     return port;
   }
 
-  public static String makePath(String[] strings) {
-    StringBuilder sb = new StringBuilder();
-    for(int i=0;i<strings.length;i++){
-      sb.append(strings[i]);
-      sb.append(File.separator);
-    }
-    return sb.toString();
-  }
-
   public static void createReceiverAndServer(int locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -2841,7 +2077,6 @@ public class WANTestBase extends DistributedTestCase{
     int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
     server.setPort(serverPort);
     server.setHostnameForClients("localhost");
-    //server.setGroups(new String[]{"serv"});
     try {
       server.start();
     } catch (IOException e) {
@@ -2849,23 +2084,6 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
 
-  public static int createReceiverInSecuredCache(int locPort) {
-	GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
-	int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
-	fact.setStartPort(port);
-	fact.setEndPort(port);
-	fact.setManualStart(true);
-	GatewayReceiver receiver = fact.create();
-	try {
-          receiver.start();
-        }
-        catch (IOException e) {
-          e.printStackTrace();
-          com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start GatewayRecevier on port " + port, e);
-        }
-	return port;
-  }
-
   public static int createServer(int locPort) {
     WANTestBase test = new WANTestBase(getTestMethodName());
     Properties props = test.getDistributedSystemProperties();
@@ -2879,7 +2097,6 @@ public class WANTestBase extends DistributedTestCase{
     int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
     server.setPort(port);
     server.setHostnameForClients("localhost");
-    //server.setGroups(new String[]{"serv"});
     try {
       server.start();
     } catch (IOException e) {
@@ -2902,7 +2119,7 @@ public class WANTestBase extends DistributedTestCase{
     CacheServerTestUtil.disableShufflingOfEndpoints();
     Pool p;
     try {
-      p = PoolManager.createFactory().addLocator(host, port0) //.setServerGroup("serv")
+      p = PoolManager.createFactory().addLocator(host, port0)
           .setPingInterval(250).setSubscriptionEnabled(true)
           .setSubscriptionRedundancy(-1).setReadTimeout(2000)
           .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10)
@@ -2975,9 +2192,6 @@ public class WANTestBase extends DistributedTestCase{
       exp1.remove();
       exp2.remove();
     }
-//    for (long i = 0; i < numPuts; i++) {
-//      r.destroy(i);
-//    }
   }
 
 
@@ -3013,20 +2227,6 @@ public class WANTestBase extends DistributedTestCase{
       exp1.remove();
       exp2.remove();
     }
-//    for (long i = 0; i < numPuts; i++) {
-//      r.destroy(i);
-//    }
-  }
-
-  /**
-   * To be used for CacheLoader related tests
-   */
-  public static void doGets(String regionName, int numGets) {
-	Region r = cache.getRegion(Region.SEPARATOR + regionName);
-	assertNotNull(r);
-	for (long i = 0; i < numGets; i++) {
-	  r.get(i);
-	}
   }
 
   public static void doPutsAfter300(String regionName, int numPuts) {
@@ -3090,40 +2290,8 @@ public class WANTestBase extends DistributedTestCase{
   public static void destroyRegion(String regionName, final int min) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (r.size() > min) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Looking for min size of region to be " + min;
-      }
-    };
-    Wait.waitForCriterion(wc, 30000, 5, false);
-    r.destroyRegion();
-  }
-
-  public static void destroyRegionAfterMinRegionSize(String regionName, final int min) {
-    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (destroyFlag) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Looking for min size of region to be " + min;
-      }
-    };
-    Wait.waitForCriterion(wc, 30000, 5, false);
+    Awaitility.await().atMost(30,TimeUnit.SECONDS).until(() -> r.size() > min);
     r.destroyRegion();
-    destroyFlag = false;
   }
 
   public static void localDestroyRegion(String regionName) {
@@ -3228,24 +2396,22 @@ public class WANTestBase extends DistributedTestCase{
     Map orderKeyValues = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
-      for (int j = 1; j <= 1; j++) {
-        int oid = (i * 1) + j;
-        OrderId orderId = new OrderId(oid, custid);
-        Order order = new Order("OREDR" + oid + "_update");
-        try {
-          orderRegion.put(orderId, order);
-          orderKeyValues.put(orderId, order);
-          assertTrue(orderRegion.containsKey(orderId));
-          assertEquals(order,orderRegion.get(orderId));
+      int oid = i + 1;
+      OrderId orderId = new OrderId(oid, custid);
+      Order order = new Order("OREDR" + oid + "_update");
+      try {
+        orderRegion.put(orderId, order);
+        orderKeyValues.put(orderId, order);
+        assertTrue(orderRegion.containsKey(orderId));
+        assertEquals(order,orderRegion.get(orderId));
 
-        }
-        catch (Exception e) {
-          com.gemstone.gemfire.test.dunit.Assert.fail(
-              "updateOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
-              e);
-        }
-        LogWriterUtils.getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
       }
+      catch (Exception e) {
+        com.gemstone.gemfire.test.dunit.Assert.fail(
+          "updateOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
+          e);
+      }
+      LogWriterUtils.getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
     }
     return orderKeyValues;
   }
@@ -3278,28 +2444,24 @@ public class WANTestBase extends DistributedTestCase{
     Map shipmentKeyValue = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
-      for (int j = 1; j <= 1; j++) {
-        int oid = (i * 1) + j;
-        OrderId orderId = new OrderId(oid, custid);
-        for (int k = 1; k <= 1; k++) {
-          int sid = (oid * 1) + k;
-          ShipmentId shipmentId = new ShipmentId(sid, orderId);
-          Shipment shipment = new Shipment("Shipment" + sid);
-          try {
-            shipmentRegion.put(shipmentId, shipment);
-            assertTrue(shipmentRegion.containsKey(shipmentId));
-            assertEquals(shipment,shipmentRegion.get(shipmentId));
-            shipmentKeyValue.put(shipmentId, shipment);
-          }
-          catch (Exception e) {
-            com.gemstone.gemfire.test.dunit.Assert.fail(
-                "putShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
-                e);
-          }
-          LogWriterUtils.getLogWriter().info(
-              "Shipment :- { " + shipmentId + " : " + shipment + " }");
-        }
+      int oid = i + 1;
+      OrderId orderId = new OrderId(oid, custid);
+      int sid = oid  + 1;
+      ShipmentId shipmentId = new ShipmentId(sid, orderId);
+      Shipment shipment = new Shipment("Shipment" + sid);
+      try {
+        shipmentRegion.put(shipmentId, shipment);
+        assertTrue(shipmentRegion.containsKey(shipmentId));
+        assertEquals(shipment,shipmentRegion.get(shipmentId));
+        shipmentKeyValue.put(shipmentId, shipment);
       }
+      catch (Exception e) {
+        com.gemstone.gemfire.test.dunit.Assert.fail(
+          "putShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
+          e);
+      }
+      LogWriterUtils.getLogWriter().info(
+        "Shipment :- { " + shipmentId + " : " + shipment + " }");
     }
     return shipmentKeyValue;
   }
@@ -3313,18 +2475,14 @@ public class WANTestBase extends DistributedTestCase{
       CustId custid = new CustId(i);
       Customer customer = new Customer("Customer" + custid, "Address" + custid);
       customerRegion.put(custid, customer);
-      for (int j = 1; j <= 1; j++) {
-        int oid = (i * 1) + j;
-        OrderId orderId = new OrderId(oid, custid);
-        Order order = new Order("Order"+orderId);
-        orderRegion.put(orderId, order);
-        for (int k = 1; k <= 1; k++) {
-          int sid = (oid * 1) + k;
-          ShipmentId shipmentId = new ShipmentId(sid, orderId);
-          Shipment shipment = new Shipment("Shipment" + sid);
-          shipmentRegion.put(shipmentId, shipment);
-        }
-      }
+      int oid = i + 1;
+      OrderId orderId = new OrderId(oid, custid);
+      Order order = new Order("Order"+orderId);
+      orderRegion.put(orderId, order);
+      int sid = oid + 1;
+      ShipmentId shipmentId = new ShipmentId(sid, orderId);
+      Shipment shipment = new Shipment("Shipment" + sid);
+      shipmentRegion.put(shipmentId, shipment);
     }
   }
 
@@ -3356,28 +2514,24 @@ public class WANTestBase extends DistributedTestCase{
     Map shipmentKeyValue = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
-      for (int j = 1; j <= 1; j++) {
-        int oid = (i * 1) + j;
-        OrderId orderId = new OrderId(oid, custid);
-        for (int k = 1; k <= 1; k++) {
-          int sid = (oid * 1) + k;
-          ShipmentId shipmentId = new ShipmentId(sid, orderId);
-          Shipment shipment = new Shipment("Shipment" + sid + "_update");
-          try {
-            shipmentRegion.put(shipmentId, shipment);
-            assertTrue(shipmentRegion.containsKey(shipmentId));
-            assertEquals(shipment,shipmentRegion.get(shipmentId));
-            shipmentKeyValue.put(shipmentId, shipment);
-          }
-          catch (Exception e) {
-            com.gemstone.gemfire.test.dunit.Assert.fail(
-                "updateShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
-                e);
-          }
-          LogWriterUtils.getLogWriter().info(
-              "Shipment :- { " + shipmentId + " : " + shipment + " }");
-        }
+      int oid = i + 1;
+      OrderId orderId = new OrderId(oid, custid);
+      int sid = oid + 1;
+      ShipmentId shipmentId = new ShipmentId(sid, orderId);
+      Shipment shipment = new Shipment("Shipment" + sid + "_update");
+      try {
+        shipmentRegion.put(shipmentId, shipment);
+        assertTrue(shipmentRegion.containsKey(shipmentId));
+        assertEquals(shipment,shipmentRegion.get(shipmentId));
+        shipmentKeyValue.put(shipmentId, shipment);
+      }
+      catch (Exception e) {
+        com.gemstone.gemfire.test.dunit.Assert.fail(
+          "updateShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
+          e);
       }
+      LogWriterUtils.getLogWriter().info(
+        "Shipment :- { " + shipmentId + " : " + shipment + " }");
     }
     return shipmentKeyValue;
   }
@@ -3421,7 +2575,7 @@ public class WANTestBase extends DistributedTestCase{
   }
 
 
-  public static void doTxPuts(String regionName, int numPuts) {
+  public static void doTxPuts(String regionName) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
     CacheTransactionManager mgr = cache.getCacheTransactionManager();
@@ -3434,7 +2588,6 @@ public class WANTestBase extends DistributedTestCase{
   }
 
   public static void doNextPuts(String regionName, int start, int numPuts) {
-    //waitForSitesToUpdate();
     IgnoredException exp = IgnoredException.addIgnoredException(CacheClosedException.class
         .getName());
     try {
@@ -3494,30 +2647,16 @@ public class WANTestBase extends DistributedTestCase{
     }
 
     if (sender.isParallel()) {
-      int totalSize = 0;
       final Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
-
-      WaitCriterion wc = new WaitCriterion() {
+      Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> {
         int size = 0;
-        public boolean done() {
-          for (RegionQueue q : queues) {
-            ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue)q;
-            size += prQ.localSize();
-          }
-          if (size == numQueueEntries) {
-            return true;
-          }
-          return false;
+        for (RegionQueue q : queues) {
+          ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue)q;
+          size += prQ.localSize();
         }
-
-        public String description() {
-          return " Expected local queue entries: " + numQueueEntries
-            + " but actual entries: " + size;
-        }
-
-      };
-
-      Wait.waitForCriterion(wc, 120000, 500, true);
+        assertEquals(" Expected local queue entries: " + numQueueEntries
+          + " but actual entries: " + size, numQueueEntries, size);
+      });
     }
   }
 
@@ -3545,38 +2684,6 @@ public class WANTestBase extends DistributedTestCase{
     return -1;
   }
 
-  public static void doUpdates(String regionName, int numUpdates) {
-    Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    for (int i = 0; i < numUpdates; i++) {
-      String s = "K"+i;
-      r.put(i, s);
-    }
-  }
-
-  public static void doUpdateOnSameKey(String regionName, int key,
-      int numUpdates) {
-    Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    for (int i = 0; i < numUpdates; i++) {
-      String s = "V_" + i;
-      r.put(key, s);
-    }
-  }
-
-  public static void doRandomUpdates(String regionName, int numUpdates) {
-    Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    Set<Integer> generatedKeys = new HashSet<Integer>();
-    while(generatedKeys.size() != numUpdates) {
-      generatedKeys.add((new Random()).nextInt(r.size()));
-    }
-    for (Integer i: generatedKeys) {
-      String s = "K"+i;
-      r.put(i, s);
-    }
-  }
-
   public static void doMultiThreadedPuts(String regionName, int numPuts) {
     final AtomicInteger ai = new AtomicInteger(-1);
     final ExecutorService execService = Executors.newFixedThreadPool(5,
@@ -3614,68 +2721,26 @@ public class WANTestBase extends DistributedTestCase{
     validateRegionSize(regionName, regionSize, 30000);
   }
 
-  public static void validateRegionSize(String regionName, final int regionSize, long waitTime) {
+  public static void validateRegionSize(String regionName, final int regionSize, long waitTimeInMilliSec) {
     IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
         .getName());
     IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
         .getName());
     try {
-
       final Region r = cache.getRegion(Region.SEPARATOR + regionName);
       assertNotNull(r);
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          if (r.keySet().size() == regionSize) {
-            return true;
-          }
-          return false;
-        }
-
-        public String description() {
-          return "Expected region entries: " + regionSize
-              + " but actual entries: " + r.keySet().size()
-              + " present region keyset " + r.keySet();
-        }
-      };
-      Wait.waitForCriterion(wc, waitTime, 500, true);
+      if ( regionSize != r.keySet().size()) {
+        Awaitility.await().atMost(waitTimeInMilliSec, TimeUnit.MILLISECONDS).pollInterval(500, TimeUnit.MILLISECONDS)
+          .until(() ->
+            assertEquals("Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size()
+              + " present region keyset " + r.keySet(), regionSize, r.keySet().size()));
+      }
     } finally {
       exp.remove();
       exp1.remove();
     }
   }
 
-  /**
-   * Validate whether all the attributes set on AsyncEventQueueFactory are set
-   * on the sender underneath the AsyncEventQueue.
-   */
-  public static void validateAsyncEventQueueAttributes(String asyncChannelId,
-      int maxQueueMemory, int batchSize, int batchTimeInterval,
-      boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
-      boolean batchConflationEnabled) {
-
-    AsyncEventQueue theChannel = null;
-
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncChannelId.equals(asyncChannel.getId())) {
-        theChannel = asyncChannel;
-      }
-    }
-
-    GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
-    assertEquals("maxQueueMemory", maxQueueMemory, theSender
-        .getMaximumQueueMemory());
-    assertEquals("batchSize", batchSize, theSender.getBatchSize());
-    assertEquals("batchTimeInterval", batchTimeInterval, theSender
-        .getBatchTimeInterval());
-    assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
-    assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
-    assertEquals("isDiskSynchronous", isDiskSynchronous, theSender
-        .isDiskSynchronous());
-    assertEquals("batchConflation", batchConflationEnabled, theSender
-        .isBatchConflationEnabled());
-  }
-
   public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
     AsyncEventListener theListener = null;
 
@@ -3688,153 +2753,43 @@ public class WANTestBase extends DistributedTestCase{
 
     final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
     assertNotNull(eventsMap);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (eventsMap.size() == expectedSize) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Expected map entries: " + expectedSize
-            + " but actual entries: " + eventsMap.size();
-      }
-    };
-    Wait.waitForCriterion(wc, 60000, 500, true); //TODO:Yogs 
-  }
-
-   public static void validateCustomAsyncEventListener(String asyncQueueId,
-      final int expectedSize) {
-    AsyncEventListener theListener = null;
-
-    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
-      if (asyncQueueId.equals(asyncQueue.getId())) {
-        theListener = asyncQueue.getAsyncEventListener();
-      }
-    }
-
-    final Map eventsMap = ((CustomAsyncEventListener) theListener).getEventsMap();
-    assertNotNull(eventsMap);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (eventsMap.size() == expectedSize) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Expected map entries: " + expectedSize
-            + " but actual entries: " + eventsMap.size();
-      }
-    };
-    Wait.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
-
-   Iterator<AsyncEvent> itr = eventsMap.values().iterator();
-   while (itr.hasNext()) {
-     AsyncEvent event = itr.next();
-     assertTrue("possibleDuplicate should be true for event: " + event, event.getPossibleDuplicate());
-   }
+    Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> assertEquals("Expected map entries: " + expectedSize
+      + " but actual entries: " + eventsMap.size(), expectedSize, eventsMap.size()));
   }
 
   public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
     AsyncEventQueue theAsyncEventQueue = null;
 
-    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
-      if (asyncQueueId.equals(asyncChannel.getId())) {
-        theAsyncEventQueue = asyncChannel;
-      }
-    }
-
-    final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
-        .getSender();
-
-    if (sender.isParallel()) {
-      final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
-          .getQueues();
-
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          int size = 0;
-          for (RegionQueue q : queues) {
-            size += q.size();
-          }
-          if (size == 0) {
-            return true;
-          }
-          return false;
-        }
-
-        public String description() {
-          int size = 0;
-          for (RegionQueue q : queues) {
-            size += q.size();
-          }
-          return "Expected queue size to be : " + 0 + " but actual entries: "
-              + size;
-        }
-      };
-      Wait.waitForCriterion(wc, 60000, 500, true);
-
-    } else {
-      WaitCriterion wc = new WaitCriterion() {
-        public boolean done() {
-          Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
-              .getQueues();
-          int size = 0;
-          for (RegionQueue q : queues) {
-            size += q.size();
-          }
-          if (size == 0) {
-            return true;
-          }
-          return false;
-        }
-
-        public String description() {
-          Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
-              .getQueues();
-          int size = 0;
-          for (RegionQueue q : queues) {
-            size += q.size();
-          }
-          return "Expected queue size to be : " + 0 + " but actual entries: "
-              + size;
-        }
-      };
-      Wait.waitForCriterion(wc, 60000, 500, true);
-    }
-  }
-
-  public static void verifyAsyncEventListenerForPossibleDuplicates(
-      String asyncEventQueueId, Set<Integer> bucketIds, int batchSize) {
-    AsyncEventListener theListener = null;
-
-    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
-      if (asyncEventQueueId.equals(asyncQueue.getId())) {
-        theListener = asyncQueue.getAsyncEventListener();
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
       }
     }
 
-    final Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap = ((MyAsyncEventListener2)theListener)
-        .getBucketToEventsMap();
-    assertNotNull(bucketToEventsMap);
-    assertTrue(bucketIds.size() > 1);
+    final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+        .getSender();
 
-    for (int bucketId : bucketIds) {
-      List<GatewaySenderEventImpl> eventsForBucket = bucketToEventsMap
-          .get(bucketId);
-      LogWriterUtils.getLogWriter().info(
-          "Events for bucket: " + bucketId + " is " + eventsForBucket);
-      assertNotNull(eventsForBucket);
-      for (int i = 0; i < batchSize; i++) {
-        GatewaySenderEventImpl senderEvent = eventsForBucket.get(i);
-        assertTrue(senderEvent.getPossibleDuplicate());
-      }
+    if (sender.isParallel()) {
+      final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+          .getQueues();
+      Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> {
+        int size = 0;
+        for (RegionQueue q : queues) {
+          size += q.size();
+        }
+        assertEquals("Expected queue size to be : " + 0 + " but actual entries: " + size, 0, size);
+      });
+    } else {
+      Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> {
+        Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+          .getQueues();
+        int size = 0;
+        for (RegionQueue q : queues) {
+          size += q.size();
+        }
+        assertEquals("Expected queue size to be : " + 0 + " but actual entries: " + size, 0, size);
+      });
     }
   }
 
@@ -3854,81 +2809,21 @@ public class WANTestBase extends DistributedTestCase{
     return eventsMap.size();
   }
 
-  public static int getAsyncEventQueueSize(String asyncEventQueueId) {
-    AsyncEventQueue theQueue = null;
-
-    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
-    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
-      if (asyncEventQueueId.equals(asyncQueue.getId())) {
-        theQueue = asyncQueue;
-      }
-    }
-    assertNotNull(theQueue);
-    return theQueue.size();
-  }
-
-
   public static void validateRegionSize_PDX(String regionName, final int regionSize) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (r.keySet().size() >= regionSize) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-
-        return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet()  ;
-      }
-    };
-    Wait.waitForCriterion(wc, 200000, 500, true);
+    Awaitility.await().atMost(200,TimeUnit.SECONDS).until(() -> assertEquals("Expected region entries: " + regionSize +
+      " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet(),
+      true,(regionSize <= r.keySet().size())));
     for(int i = 0 ; i < regionSize; i++){
       LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i));
       assertEquals(new SimpleClass(i, (byte)i), r.get("Key_" + i));
     }
   }
-  public static void validateRegionSize_PDX2(String regionName, final int regionSize) {
-    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (r.keySet().size() == regionSize) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-
-        return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet()  ;
-      }
-    };
-    Wait.waitForCriterion(wc, 200000, 500, true);
-    for(int i = 0 ; i < regionSize; i++){
-      LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i));
-      assertEquals(new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i), r.get("Key_" + i));
-    }
-  }
 
   public static void validateQueueSizeStat(String id, final int queueSize) {
     final AbstractGatewaySender sender = (AbstractGatewaySender)  cache.getGatewaySender(id);
-
-    Wait.waitForCriterion(new WaitCriterion() {
-
-      @Override
-      public boolean done() {
-        return sender.getEventQueueSize() == queueSize;
-      }
-
-      @Override
-      public String description() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-    }, 30000, 50, false);
+    Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
     assertEquals(queueSize, sender.getEventQueueSize());
   }
   /**
@@ -3987,97 +2882,21 @@ public class WANTestBase extends DistributedTestCase{
   public static void validateRegionContents(String regionName, final Map keyValues) {
     final Region r = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        for(Object key: keyValues.keySet()) {
-          if (!r.get(key).equals(keyValues.get(key))) {
-            LogWriterUtils.getLogWriter().info(
-                "The values are for key " + "  " + key + " " + r.get(key)
-                    + " in the map " + keyValues.get(key));
-            return false;
-          }
-        }
-        return true;
-      }
-
-      public String description() {
-        return "Expected region entries doesn't match";
-      }
-    };
-    Wait.waitForCriterion(wc, 120000, 500, true);
-  }
-
-  public static void CheckContent(String regionName, final int regionSize) {
-    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    for (long i = 0; i < regionSize; i++) {
-      assertEquals(i, r.get(i));
-    }
-  }
-
-  public static void validateRegionContentsForPR(String regionName,
-      final int regionSize) {
-    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (r.keySet().size() == regionSize) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size();
-      }
-    };
-    Wait.waitForCriterion(wc, 120000, 500, true);
-  }
-
-  public static void verifyPrimaryStatus(final Boolean isPrimary) {
-    final Set<GatewaySender> senders = cache.getGatewaySenders();
-    assertEquals(senders.size(), 1);
-    final AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
-
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (sender.isPrimary() == isPrimary.booleanValue()) {
-          return true;
+    Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> {
+      boolean matchFlag = true;
+      for(Object key: keyValues.keySet()) {
+        if (!r.get(key).equals(keyValues.get(key))) {
+          LogWriterUtils.getLogWriter().info(
+            "The values are for key " + "  " + key + " " + r.get(key)
+              + " in the map " + keyValues.get(key));
+          matchFlag = false;
         }
-        return false;
       }
-
-      public String description() {
-        return "Expected sender to be : " + isPrimary.booleanValue() + " but actually it is : " + sender.isPrimary();
-      }
-    };
-    Wait.waitForCriterion(wc, 120000, 500, true);
+      assertEquals("Expected region entries doesn't match", true, matchFlag);
+    });
   }
 
-  public static Boolean getPrimaryStatus(){
-    Set<GatewaySender> senders = cache.getGatewaySenders();
-    assertEquals(senders.size(), 1);
-    final AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
-    WaitCriterion wc = new WaitCriterion() {
-      public boolean done() {
-        if (sender.isPrimary()) {
-          return true;
-        }
-        return false;
-      }
-
-      public String description() {
-        return "Checking Primary Status";
-      }
-    };
-    Wait.waitForCriterion(wc, 10000, 500, false);
-    return sender.isPrimary();
-  }
 
-  public static Set<Integer> getAllPrimaryBucketsOnTheNode(String regionName) {
-    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
-    return region.getDataStore().getAllLocalPrimaryBucketIds();
-  }
 
   public static void doHeavyPuts(String regionName, int numPuts) {
     Region r = cache.getRegion(Region.SEPARATOR + regionName);
@@ -4089,24 +2908,6 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
 
-  public static void addListenerAndKillPrimary(){
-    Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
-    assertEquals(senders.size(), 1);
-    AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
-    Region queue = cache.getRegion(Region.SEPARATOR+sender.getId()+"_SERIAL_GATEWAY_SENDER_QUEUE");
-    assertNotNull(queue);
-    CacheListenerAdapter cl = new CacheListenerAdapter() {
-      public void afterCreate(EntryEvent event) {
-        if((Long)event.getKey() > 900){
-          cache.getLogger().fine(" Gateway sender is killed by a test");
-          cache.close();
-          cache.getDistributedSystem().disconnect();
-        }
-      }
-    };
-    queue.getAttributesMutator().addCacheListener(cl);
-  }
-
   public static void addCacheListenerAndDestroyRegion(String regionName){
     final Region region = cache.getRegion(Region.SEPARATOR + regionName);
     assertNotNull(region);
@@ -4121,22 +2922,6 @@ public class WANTestBase extends DistributedTestCase{
     region.getAttributesMutator().addCacheListener(cl);
   }
 
-  public static void addCacheListenerAndCloseCache(String regionName){
-    final Region region = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(region);
-    CacheListenerAdapter cl = new CacheListenerAdapter() {
-      @Override
-      public void afterCreate(EntryEvent event) {
-        if((Long)event.getKey() == 900){
-          cache.getLogger().fine(" Gateway sender is killed by a test");
-          cache.close();
-          cache.getDistributedSystem().disconnect();
-        

<TRUNCATED>