You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2015/12/11 22:23:17 UTC

[43/50] [abbrv] incubator-geode git commit: GEODE-637: Additional tests for AsyncEventQueues

GEODE-637: Additional tests for AsyncEventQueues


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/476c6cd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/476c6cd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/476c6cd3

Branch: refs/heads/feature/GEODE-217
Commit: 476c6cd3be1da503b2345d23fb2857da27d77127
Parents: 386d1ac
Author: Dan Smith <up...@apache.org>
Authored: Wed Dec 2 09:51:49 2015 -0800
Committer: Dan Smith <up...@apache.org>
Committed: Tue Dec 8 15:40:05 2015 -0800

----------------------------------------------------------------------
 .../cache/wan/AsyncEventQueueTestBase.java      | 1658 +++++++++++++++
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 1911 ++++++++++++++++++
 .../AsyncEventListenerOffHeapDUnitTest.java     |   17 +
 .../AsyncEventQueueStatsDUnitTest.java          |  311 +++
 .../ConcurrentAsyncEventQueueDUnitTest.java     |  330 +++
 ...ncurrentAsyncEventQueueOffHeapDUnitTest.java |   16 +
 .../CommonParallelAsyncEventQueueDUnitTest.java |   53 +
 ...ParallelAsyncEventQueueOffHeapDUnitTest.java |   16 +
 8 files changed, 4312 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
new file mode 100644
index 0000000..a800118
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -0,0 +1,1658 @@
+/*
+ * =========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. This
+ * product is protected by U.S. and international copyright and intellectual
+ * property laws. Pivotal products are covered by one or more patents listed at
+ * http://www.pivotal.io/patents.
+ * =========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Declarable;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.LoaderHelper;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
+import com.gemstone.gemfire.cache.control.RebalanceFactory;
+import com.gemstone.gemfire.cache.control.RebalanceOperation;
+import com.gemstone.gemfire.cache.control.RebalanceResults;
+import com.gemstone.gemfire.cache.control.ResourceManager;
+import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+public class AsyncEventQueueTestBase extends DistributedTestCase {
+
+  protected static Cache cache;
+
+  protected static VM vm0;
+
+  protected static VM vm1;
+
+  protected static VM vm2;
+
+  protected static VM vm3;
+
+  protected static VM vm4;
+
+  protected static VM vm5;
+
+  protected static VM vm6;
+
+  protected static VM vm7;
+
+  protected static AsyncEventListener eventListener1;
+
+  private static final long MAX_WAIT = 10000;
+
+  protected static GatewayEventFilter eventFilter;
+
+  protected static boolean destroyFlag = false;
+
+  protected static List<Integer> dispatcherThreads = new ArrayList<Integer>(
+      Arrays.asList(1, 3, 5));
+
+  // this will be set for each test method run with one of the values from above
+  // list
+  protected static int numDispatcherThreadsForTheRun = 1;
+
+  public AsyncEventQueueTestBase(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    final Host host = Host.getHost(0);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
+    vm2 = host.getVM(2);
+    vm3 = host.getVM(3);
+    vm4 = host.getVM(4);
+    vm5 = host.getVM(5);
+    vm6 = host.getVM(6);
+    vm7 = host.getVM(7);
+    // this is done to vary the number of dispatchers for sender
+    // during every test method run
+    shuffleNumDispatcherThreads();
+    invokeInEveryVM(AsyncEventQueueTestBase.class,
+        "setNumDispatcherThreadsForTheRun",
+        new Object[] { dispatcherThreads.get(0) });
+  }
+
+  public static void shuffleNumDispatcherThreads() {
+    Collections.shuffle(dispatcherThreads);
+  }
+
+  public static void setNumDispatcherThreadsForTheRun(int numThreads) {
+    numDispatcherThreadsForTheRun = numThreads;
+  }
+
+  public static Integer createFirstLocatorWithDSId(int dsId) {
+    if (Locator.hasLocator()) {
+      Locator.getLocator().stop();
+    }
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    //props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, "" + dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port
+        + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost["
+        + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    test.getSystem(props);
+    return port;
+  }
+
+  public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, "" + dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port
+        + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost["
+        + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost["
+        + remoteLocPort + "]");
+    test.getSystem(props);
+    return port;
+  }
+
+  public static void createReplicatedRegionWithAsyncEventQueue(
+      String regionName, String asyncQueueIds, Boolean offHeap) {
+    ExpectedException exp1 = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (asyncQueueIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String asyncQueueId = tokenizer.nextToken();
+          fact.addAsyncEventQueueId(asyncQueueId);
+        }
+      }
+      fact.setDataPolicy(DataPolicy.REPLICATE);
+      fact.setOffHeap(offHeap);
+      RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+      Region r = regionFactory.create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp1.remove();
+    }
+  }
+
+  public static void createReplicatedRegionWithCacheLoaderAndAsyncEventQueue(
+      String regionName, String asyncQueueIds) {
+
+    AttributesFactory fact = new AttributesFactory();
+    if (asyncQueueIds != null) {
+      StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
+      while (tokenizer.hasMoreTokens()) {
+        String asyncQueueId = tokenizer.nextToken();
+        fact.addAsyncEventQueueId(asyncQueueId);
+      }
+    }
+    fact.setDataPolicy(DataPolicy.REPLICATE);
+    // set the CacheLoader
+    fact.setCacheLoader(new MyCacheLoader());
+    RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+    Region r = regionFactory.create(regionName);
+    assertNotNull(r);
+  }
+
+  public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
+      String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      fact.setDataPolicy(DataPolicy.REPLICATE);
+      fact.setOffHeap(offHeap);
+      fact.setScope(Scope.DISTRIBUTED_ACK);
+      RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+      regionFactory.addAsyncEventQueueId(asyncChannelId);
+      Region r = regionFactory.create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+    }
+  }
+
+  public static void createAsyncEventQueue(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, String diskStoreName,
+      boolean isDiskSynchronous) {
+
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+
+  public static void createAsyncEventQueueWithListener2(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isPersistent, String diskStoreName) {
+
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+
+  public static void createAsyncEventQueue(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, String diskStoreName,
+      boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
+
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+
+    String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
+    String className = packagePrefix + asyncListenerClass;
+    AsyncEventListener asyncEventListener = null;
+    try {
+      Class clazz = Class.forName(className);
+      asyncEventListener = (AsyncEventListener)clazz.newInstance();
+    }
+    catch (ClassNotFoundException e) {
+      throw e;
+    }
+    catch (InstantiationException e) {
+      throw e;
+    }
+    catch (IllegalAccessException e) {
+      throw e;
+    }
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+
+  public static void createAsyncEventQueueWithCustomListener(
+      String asyncChannelId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      String diskStoreName, boolean isDiskSynchronous) {
+    createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel,
+        maxMemory, batchSize, isConflation, isPersistent, diskStoreName,
+        isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS);
+  }
+
+  public static void createAsyncEventQueueWithCustomListener(
+      String asyncChannelId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      String diskStoreName, boolean isDiskSynchronous, int nDispatchers) {
+
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+
+    try {
+      if (diskStoreName != null) {
+        File directory = new File(asyncChannelId + "_disk_"
+            + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+        directory.mkdir();
+        File[] dirs1 = new File[] { directory };
+        DiskStoreFactory dsf = cache.createDiskStoreFactory();
+        dsf.setDiskDirs(dirs1);
+        DiskStore ds = dsf.create(diskStoreName);
+      }
+
+      AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
+
+      AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+      factory.setBatchSize(batchSize);
+      factory.setPersistent(isPersistent);
+      factory.setDiskStoreName(diskStoreName);
+      factory.setMaximumQueueMemory(maxMemory);
+      factory.setParallel(isParallel);
+      factory.setDispatcherThreads(nDispatchers);
+      AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+          asyncEventListener);
+    }
+    finally {
+      exp.remove();
+    }
+  }
+
+  public static void createConcurrentAsyncEventQueue(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, String diskStoreName,
+      boolean isDiskSynchronous, int dispatcherThreads, OrderPolicy policy) {
+
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    factory.setDispatcherThreads(dispatcherThreads);
+    factory.setOrderPolicy(policy);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+
+  public static String createAsyncEventQueueWithDiskStore(
+      String asyncChannelId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isPersistent, String diskStoreName) {
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+
+    File persistentDirectory = null;
+    if (diskStoreName == null) {
+      persistentDirectory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    }
+    else {
+      persistentDirectory = new File(diskStoreName);
+    }
+    getLogWriter().info("The ds is : " + persistentDirectory.getName());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File[] dirs1 = new File[] { persistentDirectory };
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setParallel(isParallel);
+    if (isPersistent) {
+      factory.setPersistent(isPersistent);
+      factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId)
+          .getName());
+    }
+    factory.setMaximumQueueMemory(maxMemory);
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+    return persistentDirectory.getName();
+  }
+
+  public static void pauseAsyncEventQueue(String asyncChannelId) {
+    AsyncEventQueue theChannel = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+      }
+    }
+
+    ((AsyncEventQueueImpl)theChannel).getSender().pause();
+  }
+
+  public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(
+      String asyncChannelId) {
+    AsyncEventQueue theChannel = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+        break;
+      }
+    }
+
+    ((AsyncEventQueueImpl)theChannel).getSender().pause();
+
+    ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender())
+        .getEventProcessor().waitForDispatcherToPause();
+  }
+
+  public static void resumeAsyncEventQueue(String asyncQueueId) {
+    AsyncEventQueue theQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theQueue = asyncChannel;
+      }
+    }
+
+    ((AsyncEventQueueImpl)theQueue).getSender().resume();
+  }
+
+  public static void checkAsyncEventQueueSize(String asyncQueueId,
+      int numQueueEntries) {
+    AsyncEventQueue theAsyncEventQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
+      }
+    }
+
+    GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+        .getSender();
+
+    if (sender.isParallel()) {
+      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+      assertEquals(numQueueEntries,
+          queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size());
+    }
+    else {
+      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+      int size = 0;
+      for (RegionQueue q : queues) {
+        size += q.size();
+      }
+      assertEquals(numQueueEntries, size);
+    }
+  }
+
+  /**
+   * This method verifies the queue size of a ParallelGatewaySender. For
+   * ParallelGatewaySender conflation happens in a separate thread, hence test
+   * code needs to wait for some time for expected result
+   * 
+   * @param asyncQueueId
+   *          Async Queue ID
+   * @param numQueueEntries
+   *          expected number of Queue entries
+   * @throws Exception
+   */
+  public static void waitForAsyncEventQueueSize(String asyncQueueId,
+      final int numQueueEntries) throws Exception {
+    AsyncEventQueue theAsyncEventQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
+      }
+    }
+
+    GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+        .getSender();
+
+    if (sender.isParallel()) {
+      final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+          .getQueues();
+
+      waitForCriterion(new WaitCriterion() {
+
+        public String description() {
+          return "Waiting for EventQueue size to be " + numQueueEntries;
+        }
+
+        public boolean done() {
+          boolean done = numQueueEntries == queues.toArray(new RegionQueue[queues
+              .size()])[0].getRegion().size();
+          return done;
+        }
+
+      }, MAX_WAIT, 500, true);
+
+    }
+    else {
+      throw new Exception(
+          "This method should be used for only ParallelGatewaySender,SerialGatewaySender should use checkAsyncEventQueueSize() method instead");
+
+    }
+  }
+
+  public static void createPartitionedRegion(String regionName,
+      String senderIds, Integer redundantCopies, Integer totalNumBuckets) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(totalNumBuckets);
+      pfact.setRedundantCopies(redundantCopies);
+      pfact.setRecoveryDelay(0);
+      fact.setPartitionAttributes(pfact.create());
+      Region r = cache.createRegionFactory(fact.create()).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
+  public static void createPartitionedRegionWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create())
+          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
+  public static void createColocatedPartitionedRegionWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId, Integer totalNumBuckets,
+      String colocatedWith) {
+
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(totalNumBuckets);
+      pfact.setColocatedWith(colocatedWith);
+      fact.setPartitionAttributes(pfact.create());
+      Region r = cache.createRegionFactory(fact.create())
+          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
+  public static void createPartitionedRegionWithCacheLoaderAndAsyncQueue(
+      String regionName, String asyncEventQueueId) {
+
+    AttributesFactory fact = new AttributesFactory();
+
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(16);
+    fact.setPartitionAttributes(pfact.create());
+    // set the CacheLoader implementation
+    fact.setCacheLoader(new MyCacheLoader());
+    Region r = cache.createRegionFactory(fact.create())
+        .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+    assertNotNull(r);
+  }
+
+  /**
+   * Create PartitionedRegion with 1 redundant copy
+   */
+  public static void createPRWithRedundantCopyWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      pfact.setRedundantCopies(1);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create())
+          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+    }
+  }
+
+  public static void createPartitionedRegionAccessorWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId) {
+    AttributesFactory fact = new AttributesFactory();
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(16);
+    pfact.setLocalMaxMemory(0);
+    fact.setPartitionAttributes(pfact.create());
+    Region r = cache.createRegionFactory(fact.create())
+        .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+    // fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+
+  protected static void createCache(Integer locPort) {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  public static void createCacheWithoutLocator(Integer mCastPort) {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "" + mCastPort);
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  public static void checkAsyncEventQueueStats(String queueId,
+      final int queueSize, final int eventsReceived, final int eventsQueued,
+      final int eventsDistributed) {
+    Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncQueues) {
+      if (q.getId().equals(queueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assertEquals(queueSize, statistics.getEventQueueSize());
+    assertEquals(eventsReceived, statistics.getEventsReceived());
+    assertEquals(eventsQueued, statistics.getEventsQueued());
+    assert (statistics.getEventsDistributed() >= eventsDistributed);
+  }
+
+  public static void checkAsyncEventQueueConflatedStats(
+      String asyncEventQueueId, final int eventsConflated) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : queues) {
+      if (q.getId().equals(asyncEventQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
+  }
+
+  public static void checkAsyncEventQueueStats_Failover(
+      String asyncEventQueueId, final int eventsReceived) {
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncEventQueues) {
+      if (q.getId().equals(asyncEventQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+
+    assertEquals(eventsReceived, statistics.getEventsReceived());
+    assertEquals(
+        eventsReceived,
+        (statistics.getEventsQueued()
+            + statistics.getUnprocessedTokensAddedByPrimary() + statistics
+            .getUnprocessedEventsRemovedByPrimary()));
+  }
+
+  public static void checkAsyncEventQueueBatchStats(String asyncQueueId,
+      final int batches) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : queues) {
+      if (q.getId().equals(asyncQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assert (statistics.getBatchesDistributed() >= batches);
+    assertEquals(0, statistics.getBatchesRedistributed());
+  }
+
+  public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId,
+      int events) {
+    Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncQueues) {
+      if (q.getId().equals(asyncQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assertEquals(events,
+        (statistics.getUnprocessedEventsAddedBySecondary() + statistics
+            .getUnprocessedTokensRemovedBySecondary()));
+    assertEquals(events,
+        (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
+            .getUnprocessedTokensAddedByPrimary()));
+  }
+
+  public static void setRemoveFromQueueOnException(String senderId,
+      boolean removeFromQueue) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    assertNotNull(sender);
+    ((AbstractGatewaySender)sender)
+        .setRemoveFromQueueOnException(removeFromQueue);
+  }
+
+  public static void unsetRemoveFromQueueOnException(String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    assertNotNull(sender);
+    ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false);
+  }
+
+  public static void waitForSenderToBecomePrimary(String senderId) {
+    Set<GatewaySender> senders = ((GemFireCacheImpl)cache)
+        .getAllGatewaySenders();
+    final GatewaySender sender = getGatewaySenderById(senders, senderId);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (sender != null && ((AbstractGatewaySender)sender).isPrimary()) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected sender primary state to be true but is false";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 10000, 1000, true);
+  }
+
+  private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders,
+      String senderId) {
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        return s;
+      }
+    }
+    // if none of the senders matches with the supplied senderid, return null
+    return null;
+  }
+
+  public static void createSender(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
+      boolean isManulaStart) {
+    final ExpectedException exln = addExpectedException("Could not connect");
+    try {
+      File persistentDirectory = new File(dsName + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      persistentDirectory.mkdir();
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      File[] dirs1 = new File[] { persistentDirectory };
+      if (isParallel) {
+        GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+        gateway.setParallel(true);
+        gateway.setMaximumQueueMemory(maxMemory);
+        gateway.setBatchSize(batchSize);
+        gateway.setManualStart(isManulaStart);
+        // set dispatcher threads
+        gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+        ((InternalGatewaySenderFactory)gateway)
+            .setLocatorDiscoveryCallback(new MyLocatorCallback());
+        if (filter != null) {
+          eventFilter = filter;
+          gateway.addGatewayEventFilter(filter);
+        }
+        if (isPersistent) {
+          gateway.setPersistenceEnabled(true);
+          gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+              .getName());
+        }
+        else {
+          DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+          gateway.setDiskStoreName(store.getName());
+        }
+        gateway.setBatchConflationEnabled(isConflation);
+        gateway.create(dsName, remoteDsId);
+
+      }
+      else {
+        GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+        gateway.setMaximumQueueMemory(maxMemory);
+        gateway.setBatchSize(batchSize);
+        gateway.setManualStart(isManulaStart);
+        // set dispatcher threads
+        gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+        ((InternalGatewaySenderFactory)gateway)
+            .setLocatorDiscoveryCallback(new MyLocatorCallback());
+        if (filter != null) {
+          eventFilter = filter;
+          gateway.addGatewayEventFilter(filter);
+        }
+        gateway.setBatchConflationEnabled(isConflation);
+        if (isPersistent) {
+          gateway.setPersistenceEnabled(true);
+          gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+              .getName());
+        }
+        else {
+          DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+          gateway.setDiskStoreName(store.getName());
+        }
+        gateway.create(dsName, remoteDsId);
+      }
+    }
+    finally {
+      exln.remove();
+    }
+  }
+
+  public static void pauseWaitCriteria(final long millisec) {
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        return false;
+      }
+
+      public String description() {
+        return "Expected to wait for " + millisec + " millisec.";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, millisec, 500, false);
+  }
+
+  public static int createReceiver(int locPort) {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    }
+    catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + test.getName()
+          + " failed to start GatewayRecevier on port " + port);
+    }
+    return port;
+  }
+
+  public static String makePath(String[] strings) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < strings.length; i++) {
+      sb.append(strings[i]);
+      sb.append(File.separator);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Do a rebalance and verify balance was improved. If evictionPercentage > 0
+   * (the default) then we have heapLRU and this can cause simulate and
+   * rebalance results to differ if eviction kicks in between. (See BUG 44899).
+   */
+  public static void doRebalance() {
+    ResourceManager resMan = cache.getResourceManager();
+    boolean heapEviction = (resMan.getEvictionHeapPercentage() > 0);
+    RebalanceFactory factory = resMan.createRebalanceFactory();
+    try {
+      RebalanceResults simulateResults = null;
+      if (!heapEviction) {
+        getLogWriter().info("Calling rebalance simulate");
+        RebalanceOperation simulateOp = factory.simulate();
+        simulateResults = simulateOp.getResults();
+      }
+
+      getLogWriter().info("Starting rebalancing");
+      RebalanceOperation rebalanceOp = factory.start();
+      RebalanceResults rebalanceResults = rebalanceOp.getResults();
+
+    }
+    catch (InterruptedException e) {
+      fail("Interrupted", e);
+    }
+  }
+
+  public static void doPuts(String regionName, int numPuts) {
+    ExpectedException exp1 = addExpectedException(InterruptedException.class
+        .getName());
+    ExpectedException exp2 = addExpectedException(GatewaySenderException.class
+        .getName());
+    try {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        r.put(i, i);
+      }
+    }
+    finally {
+      exp1.remove();
+      exp2.remove();
+    }
+    // for (long i = 0; i < numPuts; i++) {
+    // r.destroy(i);
+    // }
+  }
+
+  /**
+   * To be used for CacheLoader related tests
+   */
+  public static void doGets(String regionName, int numGets) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (long i = 0; i < numGets; i++) {
+      r.get(i);
+    }
+  }
+
+  public static void doPutsFrom(String regionName, int from, int numPuts) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (long i = from; i < numPuts; i++) {
+      r.put(i, i);
+    }
+  }
+
+  public static void doPutAll(String regionName, int numPuts, int size) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (long i = 0; i < numPuts; i++) {
+      Map putAllMap = new HashMap();
+      for (long j = 0; j < size; j++) {
+        putAllMap.put((size * i) + j, i);
+      }
+      r.putAll(putAllMap, "putAllCallback");
+      putAllMap.clear();
+    }
+  }
+
+  public static void putGivenKeyValue(String regionName, Map keyValues) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (Object key : keyValues.keySet()) {
+      r.put(key, keyValues.get(key));
+    }
+  }
+
+  public static void doNextPuts(String regionName, int start, int numPuts) {
+    // waitForSitesToUpdate();
+    ExpectedException exp = addExpectedException(CacheClosedException.class
+        .getName());
+    try {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = start; i < numPuts; i++) {
+        r.put(i, i);
+      }
+    }
+    finally {
+      exp.remove();
+    }
+  }
+
+  public static void validateRegionSize(String regionName, final int regionSize) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try {
+
+      final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          if (r.keySet().size() == regionSize) {
+            return true;
+          }
+          return false;
+        }
+
+        public String description() {
+          return "Expected region entries: " + regionSize
+              + " but actual entries: " + r.keySet().size()
+              + " present region keyset " + r.keySet();
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 240000, 500, true);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
+  /**
+   * Validate whether all the attributes set on AsyncEventQueueFactory are set
+   * on the sender underneath the AsyncEventQueue.
+   */
+  public static void validateAsyncEventQueueAttributes(String asyncChannelId,
+      int maxQueueMemory, int batchSize, int batchTimeInterval,
+      boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
+      boolean batchConflationEnabled) {
+
+    AsyncEventQueue theChannel = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+      }
+    }
+
+    GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
+    assertEquals("maxQueueMemory", maxQueueMemory,
+        theSender.getMaximumQueueMemory());
+    assertEquals("batchSize", batchSize, theSender.getBatchSize());
+    assertEquals("batchTimeInterval", batchTimeInterval,
+        theSender.getBatchTimeInterval());
+    assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
+    assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
+    assertEquals("isDiskSynchronous", isDiskSynchronous,
+        theSender.isDiskSynchronous());
+    assertEquals("batchConflation", batchConflationEnabled,
+        theSender.isBatchConflationEnabled());
+  }
+  
+  /**
+   * Validate whether all the attributes set on AsyncEventQueueFactory are set
+   * on the sender underneath the AsyncEventQueue.
+   */
+  public static void validateConcurrentAsyncEventQueueAttributes(String asyncChannelId,
+      int maxQueueMemory, int batchSize, int batchTimeInterval,
+      boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
+      boolean batchConflationEnabled, int dispatcherThreads, OrderPolicy policy) {
+
+    AsyncEventQueue theChannel = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+      }
+    }
+
+    GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
+    assertEquals("maxQueueMemory", maxQueueMemory, theSender
+        .getMaximumQueueMemory());
+    assertEquals("batchSize", batchSize, theSender.getBatchSize());
+    assertEquals("batchTimeInterval", batchTimeInterval, theSender
+        .getBatchTimeInterval());
+    assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
+    assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
+    assertEquals("isDiskSynchronous", isDiskSynchronous, theSender
+        .isDiskSynchronous());
+    assertEquals("batchConflation", batchConflationEnabled, theSender
+        .isBatchConflationEnabled());
+    assertEquals("dispatcherThreads", dispatcherThreads, theSender
+        .getDispatcherThreads());
+    assertEquals("orderPolicy", policy, theSender.getOrderPolicy());
+  }
+
+  public static void validateAsyncEventListener(String asyncQueueId,
+      final int expectedSize) {
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap();
+    assertNotNull(eventsMap);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (eventsMap.size() == expectedSize) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected map entries: " + expectedSize
+            + " but actual entries: " + eventsMap.size();
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
+  }
+
+  public static void validateAsyncEventForOperationDetail(String asyncQueueId,
+      final int expectedSize, boolean isLoad, boolean isPutAll) {
+
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map eventsMap = ((MyAsyncEventListener_CacheLoader)theListener)
+        .getEventsMap();
+    assertNotNull(eventsMap);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (eventsMap.size() == expectedSize) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected map entries: " + expectedSize
+            + " but actual entries: " + eventsMap.size();
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
+    Collection values = eventsMap.values();
+    Iterator itr = values.iterator();
+    while (itr.hasNext()) {
+      AsyncEvent asyncEvent = (AsyncEvent)itr.next();
+      if (isLoad)
+        assertTrue(asyncEvent.getOperation().isLoad());
+      if (isPutAll)
+        assertTrue(asyncEvent.getOperation().isPutAll());
+    }
+  }
+
+  public static void validateCustomAsyncEventListener(String asyncQueueId,
+      final int expectedSize) {
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map eventsMap = ((CustomAsyncEventListener)theListener)
+        .getEventsMap();
+    assertNotNull(eventsMap);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (eventsMap.size() == expectedSize) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected map entries: " + expectedSize
+            + " but actual entries: " + eventsMap.size();
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
+
+    Iterator<AsyncEvent> itr = eventsMap.values().iterator();
+    while (itr.hasNext()) {
+      AsyncEvent event = itr.next();
+      assertTrue("possibleDuplicate should be true for event: " + event,
+          event.getPossibleDuplicate());
+    }
+  }
+
+  public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
+    AsyncEventQueue theAsyncEventQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
+      }
+    }
+
+    final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+        .getSender();
+
+    if (sender.isParallel()) {
+      final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+          .getQueues();
+
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          int size = 0;
+          for (RegionQueue q : queues) {
+            size += q.size();
+          }
+          if (size == 0) {
+            return true;
+          }
+          return false;
+        }
+
+        public String description() {
+          int size = 0;
+          for (RegionQueue q : queues) {
+            size += q.size();
+          }
+          return "Expected queue size to be : " + 0 + " but actual entries: "
+              + size;
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 60000, 500, true);
+
+    }
+    else {
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+          int size = 0;
+          for (RegionQueue q : queues) {
+            size += q.size();
+          }
+          if (size == 0) {
+            return true;
+          }
+          return false;
+        }
+
+        public String description() {
+          Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+          int size = 0;
+          for (RegionQueue q : queues) {
+            size += q.size();
+          }
+          return "Expected queue size to be : " + 0 + " but actual entries: "
+              + size;
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 60000, 500, true);
+    }
+  }
+
+  public static void verifyAsyncEventListenerForPossibleDuplicates(
+      String asyncEventQueueId, Set<Integer> bucketIds, int batchSize) {
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncEventQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap = ((MyAsyncEventListener2)theListener)
+        .getBucketToEventsMap();
+    assertNotNull(bucketToEventsMap);
+    assertTrue(bucketIds.size() > 1);
+
+    for (int bucketId : bucketIds) {
+      List<GatewaySenderEventImpl> eventsForBucket = bucketToEventsMap
+          .get(bucketId);
+      getLogWriter().info(
+          "Events for bucket: " + bucketId + " is " + eventsForBucket);
+      assertNotNull(eventsForBucket);
+      for (int i = 0; i < batchSize; i++) {
+        GatewaySenderEventImpl senderEvent = eventsForBucket.get(i);
+        assertTrue(senderEvent.getPossibleDuplicate());
+      }
+    }
+  }
+
+  public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncEventQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap();
+    assertNotNull(eventsMap);
+    getLogWriter().info("The events map size is " + eventsMap.size());
+    return eventsMap.size();
+  }
+
+  public static int getAsyncEventQueueSize(String asyncEventQueueId) {
+    AsyncEventQueue theQueue = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncEventQueueId.equals(asyncQueue.getId())) {
+        theQueue = asyncQueue;
+      }
+    }
+    assertNotNull(theQueue);
+    return theQueue.size();
+  }
+
+  public static String getRegionFullPath(String regionName) {
+    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    return r.getFullPath();
+  }
+
+  public static Set<Integer> getAllPrimaryBucketsOnTheNode(String regionName) {
+    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+    return region.getDataStore().getAllLocalPrimaryBucketIds();
+  }
+
+  public static void addCacheListenerAndCloseCache(String regionName) {
+    final Region region = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(region);
+    CacheListenerAdapter cl = new CacheListenerAdapter() {
+      @Override
+      public void afterCreate(EntryEvent event) {
+        if ((Long)event.getKey() == 900) {
+          cache.getLogger().fine(" Gateway sender is killed by a test");
+          cache.close();
+          cache.getDistributedSystem().disconnect();
+        }
+      }
+    };
+    region.getAttributesMutator().addCacheListener(cl);
+  }
+
+  public static Boolean killSender(String senderId) {
+    final ExpectedException exln = addExpectedException("Could not connect");
+    ExpectedException exp = addExpectedException(CacheClosedException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      AbstractGatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = (AbstractGatewaySender)s;
+          break;
+        }
+      }
+      if (sender.isPrimary()) {
+        getLogWriter().info("Gateway sender is killed by a test");
+        cache.getDistributedSystem().disconnect();
+        return Boolean.TRUE;
+      }
+      return Boolean.FALSE;
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+      exln.remove();
+    }
+  }
+
+  public static Boolean killAsyncEventQueue(String asyncQueueId) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueueImpl queue = null;
+    for (AsyncEventQueue q : queues) {
+      if (q.getId().equals(asyncQueueId)) {
+        queue = (AsyncEventQueueImpl)q;
+        break;
+      }
+    }
+    if (queue.isPrimary()) {
+      getLogWriter().info("AsyncEventQueue is killed by a test");
+      cache.getDistributedSystem().disconnect();
+      return Boolean.TRUE;
+    }
+    return Boolean.FALSE;
+  }
+
+  public static void killSender() {
+    getLogWriter().info("Gateway sender is going to be killed by a test");
+    cache.close();
+    cache.getDistributedSystem().disconnect();
+    getLogWriter().info("Gateway sender is killed by a test");
+  }
+
+  public static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {
+
+    private final Set discoveredLocators = new HashSet();
+
+    private final Set removedLocators = new HashSet();
+
+    public synchronized void locatorsDiscovered(List locators) {
+      discoveredLocators.addAll(locators);
+      notifyAll();
+    }
+
+    public synchronized void locatorsRemoved(List locators) {
+      removedLocators.addAll(locators);
+      notifyAll();
+    }
+
+    public boolean waitForDiscovery(InetSocketAddress locator, long time)
+        throws InterruptedException {
+      return waitFor(discoveredLocators, locator, time);
+    }
+
+    public boolean waitForRemove(InetSocketAddress locator, long time)
+        throws InterruptedException {
+      return waitFor(removedLocators, locator, time);
+    }
+
+    private synchronized boolean waitFor(Set set, InetSocketAddress locator,
+        long time) throws InterruptedException {
+      long remaining = time;
+      long endTime = System.currentTimeMillis() + time;
+      while (!set.contains(locator) && remaining >= 0) {
+        wait(remaining);
+        remaining = endTime - System.currentTimeMillis();
+      }
+      return set.contains(locator);
+    }
+
+    public synchronized Set getDiscovered() {
+      return new HashSet(discoveredLocators);
+    }
+
+    public synchronized Set getRemoved() {
+      return new HashSet(removedLocators);
+    }
+  }
+  
+  public void tearDown2() throws Exception {
+    super.tearDown2();
+    cleanupVM();
+    vm0.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm1.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm2.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm3.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm4.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm5.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm6.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm7.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+  }
+
+  public static void cleanupVM() throws IOException {
+    closeCache();
+  }
+
+  public static void closeCache() throws IOException {
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+      cache = null;
+    }
+    else {
+      AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+      if (test.isConnectedToDS()) {
+        test.getSystem().disconnect();
+      }
+    }
+  }
+
+  public static void shutdownLocator() {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    test.getSystem().disconnect();
+  }
+
+  public static void printEventListenerMap() {
+    ((MyGatewaySenderEventListener)eventListener1).printMap();
+  }
+  
+
+  @Override
+  public InternalDistributedSystem getSystem(Properties props) {
+    // For now all WANTestBase tests allocate off-heap memory even though
+    // many of them never use it.
+    // The problem is that WANTestBase has static methods that create instances
+    // of WANTestBase (instead of instances of the subclass). So we can't override
+    // this method so that only the off-heap subclasses allocate off heap memory.
+    props.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, "300m");
+    return super.getSystem(props);
+  }
+  
+  /**
+   * Returns true if the test should create off-heap regions.
+   * OffHeap tests should over-ride this method and return false.
+   */
+  public boolean isOffHeap() {
+    return false;
+  }
+
+}
+
+class MyAsyncEventListener_CacheLoader implements AsyncEventListener {
+  private final Map eventsMap;
+
+  public MyAsyncEventListener_CacheLoader() {
+    this.eventsMap = new ConcurrentHashMap();
+  }
+
+  public boolean processEvents(List<AsyncEvent> events) {
+    for (AsyncEvent event : events) {
+      this.eventsMap.put(event.getKey(), event);
+    }
+    return true;
+  }
+
+  public Map getEventsMap() {
+    return eventsMap;
+  }
+
+  public void close() {
+  }
+}
+
+class MyCacheLoader implements CacheLoader, Declarable {
+
+  public Object load(LoaderHelper helper) {
+    Long key = (Long)helper.getKey();
+    return "LoadedValue" + "_" + key;
+  }
+
+  public void close() {
+  }
+
+  public void init(Properties props) {
+  }
+
+}