You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/12/08 00:22:39 UTC

[1/3] incubator-geode git commit: GEODE-637: Additional tests for AsyncEventQueues

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-637 [created] eccf52393


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java
new file mode 100644
index 0000000..b050ef5
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java
@@ -0,0 +1,17 @@
+package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
+
+
+@SuppressWarnings("serial")
+public class AsyncEventListenerOffHeapDUnitTest extends
+    AsyncEventListenerDUnitTest {
+
+  public AsyncEventListenerOffHeapDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
new file mode 100644
index 0000000..cf4a184
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
@@ -0,0 +1,311 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
+
+import dunit.AsyncInvocation;
+
+public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
+
+  private static final long serialVersionUID = 1L;
+  
+  public AsyncEventQueueStatsDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  /**
+   * Normal replication scenario
+   */
+  public void testReplicatedSerialPropagation() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    pause(2000);//give some time for system to become stable
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln", 0, 1000, 1000, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln", 10 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln", 0, 1000, 0, 0 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln", 0 });
+  }
+  
+  /**
+   * Two listeners added to the same RR.
+   */
+  public void testAsyncStatsTwoListeners() throws Exception {
+    Integer lnPort = createFirstLocatorWithDSId(1);
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1",
+      false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1",
+      false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1",
+      false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1",
+      false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2",
+      false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2",
+      false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2",
+      false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2",
+      false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln1", 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln2", 1000 });
+    pause(2000);//give some time for system to become stable
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln1", 0, 1000, 1000, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln1", 10 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln2", 0, 1000, 1000, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln2", 10 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln1", 0, 1000, 0, 0 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln1", 0 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln2", 0, 1000, 0, 0 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln2", 0 });
+  }
+  
+  /**
+   * HA scenario: kill one vm when puts are in progress on the other vm.
+   */
+  public void testReplicatedSerialPropagationHA() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      false, 100, 100, false, false, null, false });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    AsyncInvocation inv1 = vm5.invokeAsync(AsyncEventQueueTestBase.class, "doPuts",
+        new Object[] { testName + "_RR", 10000 });
+    pause(2000);
+    AsyncInvocation inv2 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "killAsyncEventQueue", new Object[] { "ln" });
+    Boolean isKilled = Boolean.FALSE;
+    try {
+      isKilled = (Boolean)inv2.getResult();
+    }
+    catch (Throwable e) {
+      fail("Unexpected exception while killing a AsyncEventQueue");
+    }
+    AsyncInvocation inv3 = null; 
+    if(!isKilled){
+      inv3 = vm5.invokeAsync(AsyncEventQueueTestBase.class, "killSender", new Object[] { "ln" });
+      inv3.join();
+    }
+    inv1.join();
+    inv2.join();
+    pause(2000);//give some time for system to become stable
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats_Failover", new Object[] {"ln", 10000});
+  }
+
+  /**
+   * Two regions attached to same AsyncEventQueue
+   */
+  public void testReplicatedSerialPropagationUNPorcessedEvents() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      false, 100, 100, false, false, null, false });
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+
+    //create another RR (RR_2) on local site
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    
+    //start puts in RR_1 in another thread
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR_1", 1000 });
+    //do puts in RR_2 in main thread
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] { testName + "_RR_2", 1000, 1500 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1500 });
+        
+    pause(2000);//give some time for system to become stable
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {"ln",
+      0, 1500, 1500, 1500});
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueUnprocessedStats", new Object[] {"ln", 0});
+    
+    
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {"ln",
+      0, 1500, 0, 0});
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueUnprocessedStats", new Object[] {"ln", 1500});
+  }
+  
+  /**
+   * Test with conflation enabled
+   */
+  public void testSerialPropagationConflation() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    //pause at least for the batchTimeInterval to make sure that the AsyncEventQueue is actually paused
+    pause(2000);
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", keyValues });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() });
+    
+    for(int i=0;i<500;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    // Put the update events and check the queue size.
+    // There should be no conflation with the previous create events.
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", updateKeyValues });    
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+    
+    // Put the update events again and check the queue size.
+    // There should be conflation with the previous update events.
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", updateKeyValues });    
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });
+  
+    vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", new Object[] { "ln" });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });
+    
+    pause(2000);// give some time for system to become stable
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln", 0, 2000, 2000, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueConflatedStats",
+        new Object[] { "ln", 500 });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java
new file mode 100644
index 0000000..2fb7496
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java
@@ -0,0 +1,330 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
+
+import dunit.AsyncInvocation;
+
+/**
+ * @author skumar
+ *
+ */
+public class ConcurrentAsyncEventQueueDUnitTest extends AsyncEventQueueTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public ConcurrentAsyncEventQueueDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  public void testConcurrentSerialAsyncEventQueueAttributes() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 150, true, true, "testDS", true, 5, OrderPolicy.THREAD });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes",
+        new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.THREAD });
+  }
+  
+ 
+  public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyKey() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 150, true, true, "testDS", true, 5, OrderPolicy.KEY });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes",
+        new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.KEY });
+  }
+
+  public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyPartition() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 150, true, true, "testDS", true, 5, OrderPolicy.PARTITION });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes",
+        new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.PARTITION });
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated 
+   * WAN: Serial 
+   * Dispatcher threads: more than 1
+   * Order policy: key based ordering
+   */
+
+  public void testReplicatedSerialAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyKey() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        100 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 100 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated 
+   * WAN: Serial 
+   * Dispatcher threads: more than 1
+   * Order policy: Thread ordering
+   */
+
+  public void testReplicatedSerialAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyThread() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    AsyncInvocation inv1 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        50 });
+    AsyncInvocation inv2 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR",
+      50, 100 });
+    AsyncInvocation inv3 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR",
+      100, 150 });
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+    } catch (InterruptedException ie) {
+      fail(
+          "Cought interrupted exception while waiting for the task tgo complete.",
+          ie);
+    }
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 150 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: PartitionedRegion 
+   * WAN: Parallel
+   * Dispatcher threads: more than 1
+   * Order policy: key based ordering
+   */
+  // Disabling test for bug #48323
+  public void testPartitionedParallelAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyKey() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        100 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+      new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+      new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+      new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+      new Object[] { "ln" });
+  
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+      new Object[] { "ln"});
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+      new Object[] { "ln"});
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+      new Object[] { "ln"});
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+      new Object[] { "ln"});
+  
+    assertEquals(vm4size + vm5size + vm6size + vm7size, 100);
+  
+  }
+  
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: PartitionedRegion 
+   * WAN: Parallel
+   * Dispatcher threads: more than 1
+   * Order policy: PARTITION based ordering
+   */
+  // Disabled test for bug #48323
+  public void testPartitionedParallelAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyPartition() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue",
+        new Object[] { "ln", true, 100, 10, true, false, null, false, 3,
+            OrderPolicy.PARTITION });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue",
+        new Object[] { "ln", true, 100, 10, true, false, null, false, 3,
+            OrderPolicy.PARTITION });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue",
+        new Object[] { "ln", true, 100, 10, true, false, null, false, 3,
+            OrderPolicy.PARTITION });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue",
+        new Object[] { "ln", true, 100, 10, true, false, null, false, 3,
+            OrderPolicy.PARTITION });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        100 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+
+    assertEquals(100, vm4size + vm5size + vm6size + vm7size);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java
new file mode 100644
index 0000000..41eb22d
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java
@@ -0,0 +1,16 @@
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+@SuppressWarnings("serial")
+public class ConcurrentAsyncEventQueueOffHeapDUnitTest extends
+    ConcurrentAsyncEventQueueDUnitTest {
+
+  public ConcurrentAsyncEventQueueOffHeapDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java
new file mode 100644
index 0000000..425d1a6
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java
@@ -0,0 +1,53 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
+
+/**
+ * @author skumar
+ *
+ */
+public class CommonParallelAsyncEventQueueDUnitTest extends AsyncEventQueueTestBase {
+  
+  private static final long serialVersionUID = 1L;
+
+  public CommonParallelAsyncEventQueueDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception  {
+    super.setUp();
+  }
+    
+  public void testSameSenderWithNonColocatedRegions() throws Exception {
+    addExpectedException("cannot have the same parallel async");
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      true, 100, 100, false, false, null, false });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR1", "ln", isOffHeap()  });
+    try {
+      vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+          new Object[] { testName + "_PR2", "ln", isOffHeap()  });
+      fail("Expected IllegateStateException : cannot have the same parallel gateway sender");
+    }
+    catch (Exception e) {
+      if (!(e.getCause() instanceof IllegalStateException)
+          || !(e.getCause().getMessage()
+              .contains("cannot have the same parallel async event queue id"))) {
+        fail("Expected IllegalStateException", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java
new file mode 100644
index 0000000..8ab77b9
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java
@@ -0,0 +1,16 @@
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+@SuppressWarnings("serial")
+public class CommonParallelAsyncEventQueueOffHeapDUnitTest extends
+    CommonParallelAsyncEventQueueDUnitTest {
+
+  public CommonParallelAsyncEventQueueOffHeapDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}



[2/3] incubator-geode git commit: GEODE-637: Additional tests for AsyncEventQueues

Posted by up...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
new file mode 100644
index 0000000..1eafbb0
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -0,0 +1,1911 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Ignore;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
+
+public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public AsyncEventListenerDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  /**
+   * Test to verify that AsyncEventQueue can not be created when null listener
+   * is passed.
+   */
+  public void testCreateAsyncEventQueueWithNullListener() {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+
+    AsyncEventQueueFactory asyncQueueFactory = cache
+        .createAsyncEventQueueFactory();
+    try {
+      asyncQueueFactory.create("testId", null);
+      fail("AsyncQueueFactory should not allow to create AsyncEventQueue with null listener");
+    }
+    catch (IllegalArgumentException e) {
+      // expected
+    }
+
+  }
+
+  public void testSerialAsyncEventQueueAttributes() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 150, true, true, "testDS", true });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventQueueAttributes",
+        new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true });
+  }
+  
+  public void testSerialAsyncEventQueueSize() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm5
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm6
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm7
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    pause(1000);// pause at least for the batchTimeInterval
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventQueueSize", new Object[] { "ln" });
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventQueueSize", new Object[] { "ln" });
+    assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
+    assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
+  }
+  
+  /**
+   * Added to reproduce defect #50366: 
+   * NullPointerException with AsyncEventQueue#size() when number of dispatchers is more than 1
+   */
+  public void testConcurrentSerialAsyncEventQueueSize() {
+	Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+		"createFirstLocatorWithDSId", new Object[] { 1 });
+
+	vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+	vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+	vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+	vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY });
+
+	vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+		new Object[] { testName + "_RR", "ln", isOffHeap() });
+	vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+		new Object[] { testName + "_RR", "ln", isOffHeap() });
+	vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+	    new Object[] { testName + "_RR", "ln", isOffHeap() });
+	vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+		new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+	vm4
+	  .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+		new Object[] { "ln" });
+	vm5
+	  .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+		new Object[] { "ln" });
+
+	pause(1000);// pause at least for the batchTimeInterval
+
+	vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+		1000 });
+
+	int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+		"getAsyncEventQueueSize", new Object[] { "ln" });
+	int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+		"getAsyncEventQueueSize", new Object[] { "ln" });
+	assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
+	assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: false
+   */
+
+  public void testReplicatedSerialAsyncEventQueue() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+  
+  /**
+   * Verify that the events loaded by CacheLoader reach the AsyncEventListener
+   * with correct operation detail (added for defect #50237).
+   */
+  public void testReplicatedSerialAsyncEventQueueWithCacheLoader() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doGets", new Object[] { testName + "_RR",
+        10 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+        new Object[] { "ln", 10, true, false });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+        new Object[] { "ln", 0, true, false });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+        new Object[] { "ln", 0, true, false });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+        new Object[] { "ln", 0, true, false });// secondary
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated 
+   * WAN: Serial 
+   * Region persistence enabled: false 
+   * Async queue persistence enabled: false
+   * 
+   * Error is thrown from AsyncEventListener implementation while processing the batch.
+   * Added to test the fix done for defect #45152.
+   */
+
+  public void testReplicatedSerialAsyncEventQueue_ExceptionScenario() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln",
+        false, 100, 100, false, false, null, false, 1 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln",
+        false, 100, 100, false, false, null, false, 1 });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln",
+        false, 100, 100, false, false, null, false, 1 });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln",
+        false, 100, 100, false, false, null, false, 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm5
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm6
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm7
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    pause(2000);// pause at least for the batchTimeInterval
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        100 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener",
+        new Object[] { "ln", 100 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: false AsyncEventQueue conflation enabled: true
+   */
+  public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm5
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm6
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm7
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    pause(1000);// pause at least for the batchTimeInterval
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for (int i = 0; i < 1000; i++) {
+      keyValues.put(i, i);
+    }
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_RR", keyValues });
+
+    pause(1000);
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() });
+
+    for (int i = 0; i < 500; i++) {
+      updateKeyValues.put(i, i + "_updated");
+    }
+
+    // Put the update events and check the queue size.
+    // There should be no conflation with the previous create events.
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_RR", updateKeyValues });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() + updateKeyValues.size() });
+
+    // Put the update events again and check the queue size.
+    // There should be conflation with the previous update events.
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_RR", updateKeyValues });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() + updateKeyValues.size() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+
+  
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated WAN: Serial Region persistence enabled: false Async
+   * event queue persistence enabled: false
+   * 
+   * Note: The test doesn't create a locator but uses MCAST port instead.
+   */
+  @Ignore("Disabled until I can sort out the hydra dependencies - see bug 52214")
+  public void DISABLED_testReplicatedSerialAsyncEventQueueWithoutLocator() {
+    int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator",
+        new Object[] { mPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator",
+        new Object[] { mPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator",
+        new Object[] { mPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator",
+        new Object[] { mPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: true
+   * 
+   * No VM is restarted.
+   */
+
+  public void testReplicatedSerialAsyncEventQueueWithPeristenceEnabled() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: true
+   * 
+   * There is only one vm in the site and that vm is restarted
+   */
+
+  @Ignore("Disabled for 52351")
+  public void DISABLED_testReplicatedSerialAsyncEventQueueWithPeristenceEnabled_Restart() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    String firstDStore = (String)vm4.invoke(AsyncEventQueueTestBase.class,
+        "createAsyncEventQueueWithDiskStore", new Object[] { "ln", false, 100,
+            100, true, null });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    // pause async channel and then do the puts
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    // ------------------ KILL VM4 AND REBUILD
+    // ------------------------------------------
+    vm4.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {});
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+        new Object[] { "ln", false, 100, 100, true, firstDStore });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    // -----------------------------------------------------------------------------------
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+  }
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: true
+   * 
+   * There are 3 VMs in the site and the VM with primary sender is shut down.
+   */
+  @Ignore("Disabled for 52351")
+  public void DISABLED_testReplicatedSerialAsyncEventQueueWithPeristenceEnabled_Restart2() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+        new Object[] { "ln", false, 100, 100, true, null });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+        new Object[] { "ln", false, 100, 100, true, null });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+        new Object[] { "ln", false, 100, 100, true, null });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm4.invoke(AsyncEventQueueTestBase.class, "addCacheListenerAndCloseCache",
+        new Object[] { testName + "_RR" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm5.invoke(AsyncEventQueueTestBase.class, "doPuts",
+        new Object[] { testName + "_RR", 2000 });
+
+    // -----------------------------------------------------------------------------------
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForSenderToBecomePrimary",
+        new Object[] { AsyncEventQueueImpl
+            .getSenderIdFromAsyncEventQueueId("ln") });
+    
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+
+    getLogWriter().info("vm4 size is: " + vm4size);
+    getLogWriter().info("vm5 size is: " + vm5size);
+    // verify that there is no event loss
+    assertTrue(
+        "Total number of entries in events map on vm4 and vm5 should be at least 2000",
+        (vm4size + vm5size) >= 2000);
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated 
+   * WAN: Serial 
+   * Dispatcher threads: more than 1
+   * Order policy: key based ordering
+   */
+  public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated 
+   * WAN: Serial 
+   * Region persistence enabled: false 
+   * Async queue persistence enabled: false
+   */
+  public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion_2() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        500 });
+    vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR",
+      500, 1000 });
+    vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+      1000, 1500 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+  }
+  
+  /**
+   * Dispatcher threads set to more than 1 but no order policy set.
+   * Added for defect #50514.
+   */
+  public void testConcurrentSerialAsyncEventQueueWithoutOrderPolicy() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, null });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, null });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, null });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false, 3, null });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] {"ln", 0 });// secondary
+  }
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Partitioned WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: false
+   */
+  public void testPartitionedSerialAsyncEventQueue() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        500 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] {
+        testName + "_PR", 500, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Partitioned WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: false AsyncEventQueue conflation enabled: true
+   */
+  public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm5
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm6
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm7
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    
+    pause(2000);
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for (int i = 0; i < 1000; i++) {
+      keyValues.put(i, i);
+    }
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_PR", keyValues });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() });
+
+    for (int i = 0; i < 500; i++) {
+      updateKeyValues.put(i, i + "_updated");
+    }
+
+    // Put the update events and check the queue size.
+    // There should be no conflation with the previous create events.
+    vm5.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_PR", updateKeyValues });
+
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() + updateKeyValues.size() });
+
+    // Put the update events again and check the queue size.
+    // There should be conflation with the previous update events.
+    vm5.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+      testName + "_PR", updateKeyValues });
+
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+      "ln", keyValues.size() + updateKeyValues.size() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Partitioned WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: true
+   * 
+   * No VM is restarted.
+   */
+  public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, true, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, true, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, true, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, true, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        500 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] {
+        testName + "_PR", 500, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+
+  /**
+   * Test configuration::
+   * 
+   * Region: Partitioned WAN: Serial Region persistence enabled: false Async
+   * channel persistence enabled: true
+   * 
+   * There is only one vm in the site and that vm is restarted
+   */
+  public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled_Restart() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    String firstDStore = (String)vm4.invoke(AsyncEventQueueTestBase.class,
+        "createAsyncEventQueueWithDiskStore", new Object[] { "ln", false, 100,
+            100, true, null });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    // pause async channel and then do the puts
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueueAndWaitForDispatcherToPause",
+            new Object[] { "ln" });
+  
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        1000 });
+
+    // ------------------ KILL VM4 AND REBUILD
+    // ------------------------------------------
+    vm4.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {});
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+        new Object[] { "ln", false, 100, 100, true, firstDStore });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    // -----------------------------------------------------------------------------------
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+  }
+
+  public void testParallelAsyncEventQueueWithReplicatedRegion() {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+
+      vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+      vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+      vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+      vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] {
+          "ln", true, 100, 100, true, false, null, false });
+      vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] {
+          "ln", true, 100, 100, true, false, null, false });
+      vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] {
+          "ln", true, 100, 100, true, false, null, false });
+      vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] {
+          "ln", true, 100, 100, true, false, null, false });
+
+      vm4.invoke(AsyncEventQueueTestBase.class,
+          "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+              testName + "_RR", "ln", isOffHeap() });
+      fail("Expected GatewaySenderConfigException where parallel async event queue can not be used with replicated region");
+    }
+    catch (Exception e) {
+      if (!e.getCause().getMessage()
+          .contains("can not be used with replicated region")) {
+        fail("Expected GatewaySenderConfigException where parallel async event queue can not be used with replicated region");
+      }
+    }
+  }
+  
+  public void testParallelAsyncEventQueue() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        256 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    
+    assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
+  }
+  
+  /**
+   * Verify that the events reaching the AsyncEventListener have correct operation detail.
+   * (added for defect #50237).
+   */
+  public void testParallelAsyncEventQueueWithCacheLoader() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+	  "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+    	true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+    	true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+    	true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+    	true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue",
+    	new Object[] { testName + "_PR", "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue",
+    	new Object[] { testName + "_PR", "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue",
+    	new Object[] { testName + "_PR", "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue",
+    	new Object[] { testName + "_PR", "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPutAll", new Object[] { testName + "_PR",
+    	100, 10 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+    	new Object[] { "ln", 250, false, true });
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+    	new Object[] { "ln", 250, false, true });
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+    	new Object[] { "ln", 250, false, true });
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+    	new Object[] { "ln", 250, false, true });
+  }
+  
+  public void testParallelAsyncEventQueueSize() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm5
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm6
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm7
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    pause(1000);// pause at least for the batchTimeInterval
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        1000 });
+
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventQueueSize", new Object[] { "ln" });
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventQueueSize", new Object[] { "ln" });
+    
+    assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
+    assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
+  }
+  
+  /**
+   * Added to reproduce defect #50366: 
+   * NullPointerException with AsyncEventQueue#size() when number of dispatchers is more than 1
+   */
+  public void testConcurrentParallelAsyncEventQueueSize() {
+	Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+	  "createFirstLocatorWithDSId", new Object[] { 1 });
+
+	vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+	vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+	vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+	vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+	vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+	  true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY });
+	vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+	  true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY });
+	vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+	  true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY });
+	vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+	  true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY });
+
+	vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+	  new Object[] { testName + "_PR", "ln", isOffHeap() });
+	vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+	  new Object[] { testName + "_PR", "ln", isOffHeap() });
+	vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+	  new Object[] { testName + "_PR", "ln", isOffHeap() });
+	vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+	  new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+	vm4
+	  .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+		new Object[] { "ln" });
+	vm5
+	  .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+		new Object[] { "ln" });
+	vm6
+	  .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+		new Object[] { "ln" });
+	vm7
+	  .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+		new Object[] { "ln" });
+	pause(1000);// pause at least for the batchTimeInterval
+
+	vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+	  1000 });
+
+	int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+	  "getAsyncEventQueueSize", new Object[] { "ln" });
+	int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+	  "getAsyncEventQueueSize", new Object[] { "ln" });
+	    
+	assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
+	assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
+  }
+  
+  public void testParallelAsyncEventQueueWithConflationEnabled() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, true, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, true, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, true, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, true, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm5
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm6
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm7
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+
+    pause(2000);// pause for the batchTimeInterval to ensure that all the
+    // senders are paused
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for (int i = 0; i < 1000; i++) {
+      keyValues.put(i, i);
+    }
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_PR", keyValues });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() });
+
+    for (int i = 0; i < 500; i++) {
+      updateKeyValues.put(i, i + "_updated");
+    }
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_PR", updateKeyValues });
+
+ 
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() + updateKeyValues.size() }); // no conflation of creates
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_PR", updateKeyValues });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() + updateKeyValues.size() }); // conflation of updates
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    
+    assertEquals(vm4size + vm5size + vm6size + vm7size, keyValues.size());
+  }
+
+  /**
+   * Added to reproduce defect #47213
+   */
+  public void testParallelAsyncEventQueueWithConflationEnabled_bug47213() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, true, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, true, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, true, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, true, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm5
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm6
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm7
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+
+    pause(2000);// pause for the batchTimeInterval to ensure that all the
+    // senders are paused
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for (int i = 0; i < 1000; i++) {
+      keyValues.put(i, i);
+    }
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_PR", keyValues });
+
+    pause(2000);
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() });
+
+    for (int i = 0; i < 500; i++) {
+      updateKeyValues.put(i, i + "_updated");
+    }
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_PR", updateKeyValues });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+        testName + "_PR", updateKeyValues });
+
+    // pause to ensure that events have been conflated.
+    pause(2000);
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+        "ln", keyValues.size() + updateKeyValues.size() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    
+    assertEquals(vm4size + vm5size + vm6size + vm7size, keyValues.size());
+    
+  }
+
+  public void testParallelAsyncEventQueueWithOneAccessor() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm3.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+
+    vm3.invoke(AsyncEventQueueTestBase.class,
+        "createPartitionedRegionAccessorWithAsyncEventQueue", new Object[] {
+            testName + "_PR", "ln" });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm3.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        256 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    
+    vm3.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });
+    
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    
+    assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
+
+  }
+
+  public void testParallelAsyncEventQueueWithPersistence() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, true, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, true, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, true, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, true, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        256 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+        new Object[] { "ln"});
+    
+    assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
+  }
+  
+  /**
+   * Below test is disabled intentionally Replicated region with Parallel Async
+   * Event queue is not supported. Test is added for the same
+   * testParallelAsyncEventQueueWithReplicatedRegion
+   * 
+   * We are gone support this configuration in upcoming releases
+   */
+  
+  public void DISABLED_DUETO_BUG51491_testReplicatedParallelAsyncEventQueue() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        true, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+
+    assertEquals(vm4size + vm5size + vm6size + vm7size, 1000);
+  }
+  
+/**
+ * Test case to test possibleDuplicates. vm4 & vm5 are hosting the PR. vm5 is
+ * killed so the buckets hosted by it are shifted to vm4.
+ */
+  @Ignore("Disabled for 52349")
+  public void DISABLED_testParallelAsyncEventQueueHA_Scenario1() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+      "createFirstLocatorWithDSId", new Object[] { 1 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    getLogWriter().info("Created the cache");
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithListener2",
+        new Object[] { "ln", true, 100, 5, false, null });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithListener2",
+        new Object[] { "ln", true, 100, 5, false, null });
+
+    getLogWriter().info("Created the AsyncEventQueue");
+
+    vm4.invoke(AsyncEventQueueTestBase.class,
+        "createPRWithRedundantCopyWithAsyncEventQueue", new Object[] {
+            testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class,
+        "createPRWithRedundantCopyWithAsyncEventQueue", new Object[] {
+            testName + "_PR", "ln", isOffHeap() });
+
+    getLogWriter().info("Created PR with AsyncEventQueue");
+
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    vm5
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    pause(1000);// pause for the batchTimeInterval to make sure the AsyncQueue
+                // is paused
+
+    getLogWriter().info("Paused the AsyncEventQueue");
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts",
+        new Object[] { testName + "_PR", 80 });
+
+    getLogWriter().info("Done puts");
+
+    Set<Integer> primaryBucketsVm5 = (Set<Integer>)vm5.invoke(
+        AsyncEventQueueTestBase.class, "getAllPrimaryBucketsOnTheNode",
+        new Object[] { testName + "_PR" });
+
+    getLogWriter().info("Primary buckets on vm5: " + primaryBucketsVm5);
+    // ---------------------------- Kill vm5 --------------------------
+    vm5.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {});
+
+    pause(1000);// give some time for rebalancing to happen
+    vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { 

<TRUNCATED>


[3/3] incubator-geode git commit: GEODE-637: Additional tests for AsyncEventQueues

Posted by up...@apache.org.
GEODE-637: Additional tests for AsyncEventQueues


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/eccf5239
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/eccf5239
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/eccf5239

Branch: refs/heads/feature/GEODE-637
Commit: eccf5239370f934d61f87c713779fcdd48ad588b
Parents: bd43c34
Author: Dan Smith <up...@apache.org>
Authored: Wed Dec 2 09:51:49 2015 -0800
Committer: Dan Smith <up...@apache.org>
Committed: Mon Dec 7 15:21:25 2015 -0800

----------------------------------------------------------------------
 .../cache/wan/AsyncEventQueueTestBase.java      | 1658 +++++++++++++++
 .../asyncqueue/AsyncEventListenerDUnitTest.java | 1911 ++++++++++++++++++
 .../AsyncEventListenerOffHeapDUnitTest.java     |   17 +
 .../AsyncEventQueueStatsDUnitTest.java          |  311 +++
 .../ConcurrentAsyncEventQueueDUnitTest.java     |  330 +++
 ...ncurrentAsyncEventQueueOffHeapDUnitTest.java |   16 +
 .../CommonParallelAsyncEventQueueDUnitTest.java |   53 +
 ...ParallelAsyncEventQueueOffHeapDUnitTest.java |   16 +
 8 files changed, 4312 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
new file mode 100644
index 0000000..a800118
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/AsyncEventQueueTestBase.java
@@ -0,0 +1,1658 @@
+/*
+ * =========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. This
+ * product is protected by U.S. and international copyright and intellectual
+ * property laws. Pivotal products are covered by one or more patents listed at
+ * http://www.pivotal.io/patents.
+ * =========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheLoader;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Declarable;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.LoaderHelper;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
+import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
+import com.gemstone.gemfire.cache.control.RebalanceFactory;
+import com.gemstone.gemfire.cache.control.RebalanceOperation;
+import com.gemstone.gemfire.cache.control.RebalanceResults;
+import com.gemstone.gemfire.cache.control.ResourceManager;
+import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+public class AsyncEventQueueTestBase extends DistributedTestCase {
+
+  protected static Cache cache;
+
+  protected static VM vm0;
+
+  protected static VM vm1;
+
+  protected static VM vm2;
+
+  protected static VM vm3;
+
+  protected static VM vm4;
+
+  protected static VM vm5;
+
+  protected static VM vm6;
+
+  protected static VM vm7;
+
+  protected static AsyncEventListener eventListener1;
+
+  private static final long MAX_WAIT = 10000;
+
+  protected static GatewayEventFilter eventFilter;
+
+  protected static boolean destroyFlag = false;
+
+  protected static List<Integer> dispatcherThreads = new ArrayList<Integer>(
+      Arrays.asList(1, 3, 5));
+
+  // this will be set for each test method run with one of the values from above
+  // list
+  protected static int numDispatcherThreadsForTheRun = 1;
+
+  public AsyncEventQueueTestBase(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    final Host host = Host.getHost(0);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
+    vm2 = host.getVM(2);
+    vm3 = host.getVM(3);
+    vm4 = host.getVM(4);
+    vm5 = host.getVM(5);
+    vm6 = host.getVM(6);
+    vm7 = host.getVM(7);
+    // this is done to vary the number of dispatchers for sender
+    // during every test method run
+    shuffleNumDispatcherThreads();
+    invokeInEveryVM(AsyncEventQueueTestBase.class,
+        "setNumDispatcherThreadsForTheRun",
+        new Object[] { dispatcherThreads.get(0) });
+  }
+
+  public static void shuffleNumDispatcherThreads() {
+    Collections.shuffle(dispatcherThreads);
+  }
+
+  public static void setNumDispatcherThreadsForTheRun(int numThreads) {
+    numDispatcherThreadsForTheRun = numThreads;
+  }
+
+  public static Integer createFirstLocatorWithDSId(int dsId) {
+    if (Locator.hasLocator()) {
+      Locator.getLocator().stop();
+    }
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    //props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, "" + dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port
+        + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost["
+        + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    test.getSystem(props);
+    return port;
+  }
+
+  public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, "" + dsId);
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port
+        + "]");
+    props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost["
+        + port + "],server=true,peer=true,hostname-for-clients=localhost");
+    props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost["
+        + remoteLocPort + "]");
+    test.getSystem(props);
+    return port;
+  }
+
+  public static void createReplicatedRegionWithAsyncEventQueue(
+      String regionName, String asyncQueueIds, Boolean offHeap) {
+    ExpectedException exp1 = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (asyncQueueIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String asyncQueueId = tokenizer.nextToken();
+          fact.addAsyncEventQueueId(asyncQueueId);
+        }
+      }
+      fact.setDataPolicy(DataPolicy.REPLICATE);
+      fact.setOffHeap(offHeap);
+      RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+      Region r = regionFactory.create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp1.remove();
+    }
+  }
+
+  public static void createReplicatedRegionWithCacheLoaderAndAsyncEventQueue(
+      String regionName, String asyncQueueIds) {
+
+    AttributesFactory fact = new AttributesFactory();
+    if (asyncQueueIds != null) {
+      StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
+      while (tokenizer.hasMoreTokens()) {
+        String asyncQueueId = tokenizer.nextToken();
+        fact.addAsyncEventQueueId(asyncQueueId);
+      }
+    }
+    fact.setDataPolicy(DataPolicy.REPLICATE);
+    // set the CacheLoader
+    fact.setCacheLoader(new MyCacheLoader());
+    RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+    Region r = regionFactory.create(regionName);
+    assertNotNull(r);
+  }
+
+  public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
+      String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      fact.setDataPolicy(DataPolicy.REPLICATE);
+      fact.setOffHeap(offHeap);
+      fact.setScope(Scope.DISTRIBUTED_ACK);
+      RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+      regionFactory.addAsyncEventQueueId(asyncChannelId);
+      Region r = regionFactory.create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+    }
+  }
+
+  public static void createAsyncEventQueue(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, String diskStoreName,
+      boolean isDiskSynchronous) {
+
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+
+  public static void createAsyncEventQueueWithListener2(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isPersistent, String diskStoreName) {
+
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+
+  public static void createAsyncEventQueue(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, String diskStoreName,
+      boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
+
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+
+    String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
+    String className = packagePrefix + asyncListenerClass;
+    AsyncEventListener asyncEventListener = null;
+    try {
+      Class clazz = Class.forName(className);
+      asyncEventListener = (AsyncEventListener)clazz.newInstance();
+    }
+    catch (ClassNotFoundException e) {
+      throw e;
+    }
+    catch (InstantiationException e) {
+      throw e;
+    }
+    catch (IllegalAccessException e) {
+      throw e;
+    }
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+
+  public static void createAsyncEventQueueWithCustomListener(
+      String asyncChannelId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      String diskStoreName, boolean isDiskSynchronous) {
+    createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel,
+        maxMemory, batchSize, isConflation, isPersistent, diskStoreName,
+        isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS);
+  }
+
+  public static void createAsyncEventQueueWithCustomListener(
+      String asyncChannelId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isConflation, boolean isPersistent,
+      String diskStoreName, boolean isDiskSynchronous, int nDispatchers) {
+
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+
+    try {
+      if (diskStoreName != null) {
+        File directory = new File(asyncChannelId + "_disk_"
+            + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+        directory.mkdir();
+        File[] dirs1 = new File[] { directory };
+        DiskStoreFactory dsf = cache.createDiskStoreFactory();
+        dsf.setDiskDirs(dirs1);
+        DiskStore ds = dsf.create(diskStoreName);
+      }
+
+      AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
+
+      AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+      factory.setBatchSize(batchSize);
+      factory.setPersistent(isPersistent);
+      factory.setDiskStoreName(diskStoreName);
+      factory.setMaximumQueueMemory(maxMemory);
+      factory.setParallel(isParallel);
+      factory.setDispatcherThreads(nDispatchers);
+      AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+          asyncEventListener);
+    }
+    finally {
+      exp.remove();
+    }
+  }
+
+  public static void createConcurrentAsyncEventQueue(String asyncChannelId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, String diskStoreName,
+      boolean isDiskSynchronous, int dispatcherThreads, OrderPolicy policy) {
+
+    if (diskStoreName != null) {
+      File directory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      directory.mkdir();
+      File[] dirs1 = new File[] { directory };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      dsf.setDiskDirs(dirs1);
+      DiskStore ds = dsf.create(diskStoreName);
+    }
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setPersistent(isPersistent);
+    factory.setDiskStoreName(diskStoreName);
+    factory.setDiskSynchronous(isDiskSynchronous);
+    factory.setBatchConflationEnabled(isConflation);
+    factory.setMaximumQueueMemory(maxMemory);
+    factory.setParallel(isParallel);
+    factory.setDispatcherThreads(dispatcherThreads);
+    factory.setOrderPolicy(policy);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+  }
+
+  public static String createAsyncEventQueueWithDiskStore(
+      String asyncChannelId, boolean isParallel, Integer maxMemory,
+      Integer batchSize, boolean isPersistent, String diskStoreName) {
+
+    AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+
+    File persistentDirectory = null;
+    if (diskStoreName == null) {
+      persistentDirectory = new File(asyncChannelId + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    }
+    else {
+      persistentDirectory = new File(diskStoreName);
+    }
+    getLogWriter().info("The ds is : " + persistentDirectory.getName());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File[] dirs1 = new File[] { persistentDirectory };
+
+    AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+    factory.setBatchSize(batchSize);
+    factory.setParallel(isParallel);
+    if (isPersistent) {
+      factory.setPersistent(isPersistent);
+      factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId)
+          .getName());
+    }
+    factory.setMaximumQueueMemory(maxMemory);
+    // set dispatcher threads
+    factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+    AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
+        asyncEventListener);
+    return persistentDirectory.getName();
+  }
+
+  public static void pauseAsyncEventQueue(String asyncChannelId) {
+    AsyncEventQueue theChannel = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+      }
+    }
+
+    ((AsyncEventQueueImpl)theChannel).getSender().pause();
+  }
+
+  public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(
+      String asyncChannelId) {
+    AsyncEventQueue theChannel = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+        break;
+      }
+    }
+
+    ((AsyncEventQueueImpl)theChannel).getSender().pause();
+
+    ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender())
+        .getEventProcessor().waitForDispatcherToPause();
+  }
+
+  public static void resumeAsyncEventQueue(String asyncQueueId) {
+    AsyncEventQueue theQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theQueue = asyncChannel;
+      }
+    }
+
+    ((AsyncEventQueueImpl)theQueue).getSender().resume();
+  }
+
+  public static void checkAsyncEventQueueSize(String asyncQueueId,
+      int numQueueEntries) {
+    AsyncEventQueue theAsyncEventQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
+      }
+    }
+
+    GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+        .getSender();
+
+    if (sender.isParallel()) {
+      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+      assertEquals(numQueueEntries,
+          queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size());
+    }
+    else {
+      Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+      int size = 0;
+      for (RegionQueue q : queues) {
+        size += q.size();
+      }
+      assertEquals(numQueueEntries, size);
+    }
+  }
+
+  /**
+   * This method verifies the queue size of a ParallelGatewaySender. For
+   * ParallelGatewaySender conflation happens in a separate thread, hence test
+   * code needs to wait for some time for expected result
+   * 
+   * @param asyncQueueId
+   *          Async Queue ID
+   * @param numQueueEntries
+   *          expected number of Queue entries
+   * @throws Exception
+   */
+  public static void waitForAsyncEventQueueSize(String asyncQueueId,
+      final int numQueueEntries) throws Exception {
+    AsyncEventQueue theAsyncEventQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
+      }
+    }
+
+    GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+        .getSender();
+
+    if (sender.isParallel()) {
+      final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+          .getQueues();
+
+      waitForCriterion(new WaitCriterion() {
+
+        public String description() {
+          return "Waiting for EventQueue size to be " + numQueueEntries;
+        }
+
+        public boolean done() {
+          boolean done = numQueueEntries == queues.toArray(new RegionQueue[queues
+              .size()])[0].getRegion().size();
+          return done;
+        }
+
+      }, MAX_WAIT, 500, true);
+
+    }
+    else {
+      throw new Exception(
+          "This method should be used for only ParallelGatewaySender,SerialGatewaySender should use checkAsyncEventQueueSize() method instead");
+
+    }
+  }
+
+  public static void createPartitionedRegion(String regionName,
+      String senderIds, Integer redundantCopies, Integer totalNumBuckets) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          // GatewaySender sender = cache.getGatewaySender(senderId);
+          // assertNotNull(sender);
+          fact.addGatewaySenderId(senderId);
+        }
+      }
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(totalNumBuckets);
+      pfact.setRedundantCopies(redundantCopies);
+      pfact.setRecoveryDelay(0);
+      fact.setPartitionAttributes(pfact.create());
+      Region r = cache.createRegionFactory(fact.create()).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
+  public static void createPartitionedRegionWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create())
+          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
+  public static void createColocatedPartitionedRegionWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId, Integer totalNumBuckets,
+      String colocatedWith) {
+
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(PartitionOfflineException.class
+        .getName());
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(totalNumBuckets);
+      pfact.setColocatedWith(colocatedWith);
+      fact.setPartitionAttributes(pfact.create());
+      Region r = cache.createRegionFactory(fact.create())
+          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
+  public static void createPartitionedRegionWithCacheLoaderAndAsyncQueue(
+      String regionName, String asyncEventQueueId) {
+
+    AttributesFactory fact = new AttributesFactory();
+
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(16);
+    fact.setPartitionAttributes(pfact.create());
+    // set the CacheLoader implementation
+    fact.setCacheLoader(new MyCacheLoader());
+    Region r = cache.createRegionFactory(fact.create())
+        .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+    assertNotNull(r);
+  }
+
+  /**
+   * Create PartitionedRegion with 1 redundant copy
+   */
+  public static void createPRWithRedundantCopyWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId, Boolean offHeap) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+
+    try {
+      AttributesFactory fact = new AttributesFactory();
+
+      PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+      pfact.setTotalNumBuckets(16);
+      pfact.setRedundantCopies(1);
+      fact.setPartitionAttributes(pfact.create());
+      fact.setOffHeap(offHeap);
+      Region r = cache.createRegionFactory(fact.create())
+          .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+      assertNotNull(r);
+    }
+    finally {
+      exp.remove();
+    }
+  }
+
+  public static void createPartitionedRegionAccessorWithAsyncEventQueue(
+      String regionName, String asyncEventQueueId) {
+    AttributesFactory fact = new AttributesFactory();
+    PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+    pfact.setTotalNumBuckets(16);
+    pfact.setLocalMaxMemory(0);
+    fact.setPartitionAttributes(pfact.create());
+    Region r = cache.createRegionFactory(fact.create())
+        .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
+    // fact.create()).create(regionName);
+    assertNotNull(r);
+  }
+
+  protected static void createCache(Integer locPort) {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  public static void createCacheWithoutLocator(Integer mCastPort) {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "" + mCastPort);
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+  }
+
+  public static void checkAsyncEventQueueStats(String queueId,
+      final int queueSize, final int eventsReceived, final int eventsQueued,
+      final int eventsDistributed) {
+    Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncQueues) {
+      if (q.getId().equals(queueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assertEquals(queueSize, statistics.getEventQueueSize());
+    assertEquals(eventsReceived, statistics.getEventsReceived());
+    assertEquals(eventsQueued, statistics.getEventsQueued());
+    assert (statistics.getEventsDistributed() >= eventsDistributed);
+  }
+
+  public static void checkAsyncEventQueueConflatedStats(
+      String asyncEventQueueId, final int eventsConflated) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : queues) {
+      if (q.getId().equals(asyncEventQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
+  }
+
+  public static void checkAsyncEventQueueStats_Failover(
+      String asyncEventQueueId, final int eventsReceived) {
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncEventQueues) {
+      if (q.getId().equals(asyncEventQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+
+    assertEquals(eventsReceived, statistics.getEventsReceived());
+    assertEquals(
+        eventsReceived,
+        (statistics.getEventsQueued()
+            + statistics.getUnprocessedTokensAddedByPrimary() + statistics
+            .getUnprocessedEventsRemovedByPrimary()));
+  }
+
+  public static void checkAsyncEventQueueBatchStats(String asyncQueueId,
+      final int batches) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : queues) {
+      if (q.getId().equals(asyncQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assert (statistics.getBatchesDistributed() >= batches);
+    assertEquals(0, statistics.getBatchesRedistributed());
+  }
+
+  public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId,
+      int events) {
+    Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
+    AsyncEventQueue queue = null;
+    for (AsyncEventQueue q : asyncQueues) {
+      if (q.getId().equals(asyncQueueId)) {
+        queue = q;
+        break;
+      }
+    }
+    final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
+        .getStatistics();
+    assertEquals(events,
+        (statistics.getUnprocessedEventsAddedBySecondary() + statistics
+            .getUnprocessedTokensRemovedBySecondary()));
+    assertEquals(events,
+        (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
+            .getUnprocessedTokensAddedByPrimary()));
+  }
+
+  public static void setRemoveFromQueueOnException(String senderId,
+      boolean removeFromQueue) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    assertNotNull(sender);
+    ((AbstractGatewaySender)sender)
+        .setRemoveFromQueueOnException(removeFromQueue);
+  }
+
+  public static void unsetRemoveFromQueueOnException(String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    assertNotNull(sender);
+    ((AbstractGatewaySender)sender).setRemoveFromQueueOnException(false);
+  }
+
+  public static void waitForSenderToBecomePrimary(String senderId) {
+    Set<GatewaySender> senders = ((GemFireCacheImpl)cache)
+        .getAllGatewaySenders();
+    final GatewaySender sender = getGatewaySenderById(senders, senderId);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (sender != null && ((AbstractGatewaySender)sender).isPrimary()) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected sender primary state to be true but is false";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 10000, 1000, true);
+  }
+
+  private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders,
+      String senderId) {
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        return s;
+      }
+    }
+    // if none of the senders matches with the supplied senderid, return null
+    return null;
+  }
+
+  public static void createSender(String dsName, int remoteDsId,
+      boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
+      boolean isManulaStart) {
+    final ExpectedException exln = addExpectedException("Could not connect");
+    try {
+      File persistentDirectory = new File(dsName + "_disk_"
+          + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+      persistentDirectory.mkdir();
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      File[] dirs1 = new File[] { persistentDirectory };
+      if (isParallel) {
+        GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+        gateway.setParallel(true);
+        gateway.setMaximumQueueMemory(maxMemory);
+        gateway.setBatchSize(batchSize);
+        gateway.setManualStart(isManulaStart);
+        // set dispatcher threads
+        gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+        ((InternalGatewaySenderFactory)gateway)
+            .setLocatorDiscoveryCallback(new MyLocatorCallback());
+        if (filter != null) {
+          eventFilter = filter;
+          gateway.addGatewayEventFilter(filter);
+        }
+        if (isPersistent) {
+          gateway.setPersistenceEnabled(true);
+          gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+              .getName());
+        }
+        else {
+          DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+          gateway.setDiskStoreName(store.getName());
+        }
+        gateway.setBatchConflationEnabled(isConflation);
+        gateway.create(dsName, remoteDsId);
+
+      }
+      else {
+        GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+        gateway.setMaximumQueueMemory(maxMemory);
+        gateway.setBatchSize(batchSize);
+        gateway.setManualStart(isManulaStart);
+        // set dispatcher threads
+        gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+        ((InternalGatewaySenderFactory)gateway)
+            .setLocatorDiscoveryCallback(new MyLocatorCallback());
+        if (filter != null) {
+          eventFilter = filter;
+          gateway.addGatewayEventFilter(filter);
+        }
+        gateway.setBatchConflationEnabled(isConflation);
+        if (isPersistent) {
+          gateway.setPersistenceEnabled(true);
+          gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+              .getName());
+        }
+        else {
+          DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+          gateway.setDiskStoreName(store.getName());
+        }
+        gateway.create(dsName, remoteDsId);
+      }
+    }
+    finally {
+      exln.remove();
+    }
+  }
+
+  public static void pauseWaitCriteria(final long millisec) {
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        return false;
+      }
+
+      public String description() {
+        return "Expected to wait for " + millisec + " millisec.";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, millisec, 500, false);
+  }
+
+  public static int createReceiver(int locPort) {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    }
+    catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + test.getName()
+          + " failed to start GatewayRecevier on port " + port);
+    }
+    return port;
+  }
+
+  public static String makePath(String[] strings) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < strings.length; i++) {
+      sb.append(strings[i]);
+      sb.append(File.separator);
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Do a rebalance and verify balance was improved. If evictionPercentage > 0
+   * (the default) then we have heapLRU and this can cause simulate and
+   * rebalance results to differ if eviction kicks in between. (See BUG 44899).
+   */
+  public static void doRebalance() {
+    ResourceManager resMan = cache.getResourceManager();
+    boolean heapEviction = (resMan.getEvictionHeapPercentage() > 0);
+    RebalanceFactory factory = resMan.createRebalanceFactory();
+    try {
+      RebalanceResults simulateResults = null;
+      if (!heapEviction) {
+        getLogWriter().info("Calling rebalance simulate");
+        RebalanceOperation simulateOp = factory.simulate();
+        simulateResults = simulateOp.getResults();
+      }
+
+      getLogWriter().info("Starting rebalancing");
+      RebalanceOperation rebalanceOp = factory.start();
+      RebalanceResults rebalanceResults = rebalanceOp.getResults();
+
+    }
+    catch (InterruptedException e) {
+      fail("Interrupted", e);
+    }
+  }
+
+  public static void doPuts(String regionName, int numPuts) {
+    ExpectedException exp1 = addExpectedException(InterruptedException.class
+        .getName());
+    ExpectedException exp2 = addExpectedException(GatewaySenderException.class
+        .getName());
+    try {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        r.put(i, i);
+      }
+    }
+    finally {
+      exp1.remove();
+      exp2.remove();
+    }
+    // for (long i = 0; i < numPuts; i++) {
+    // r.destroy(i);
+    // }
+  }
+
+  /**
+   * To be used for CacheLoader related tests
+   */
+  public static void doGets(String regionName, int numGets) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (long i = 0; i < numGets; i++) {
+      r.get(i);
+    }
+  }
+
+  public static void doPutsFrom(String regionName, int from, int numPuts) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (long i = from; i < numPuts; i++) {
+      r.put(i, i);
+    }
+  }
+
+  public static void doPutAll(String regionName, int numPuts, int size) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (long i = 0; i < numPuts; i++) {
+      Map putAllMap = new HashMap();
+      for (long j = 0; j < size; j++) {
+        putAllMap.put((size * i) + j, i);
+      }
+      r.putAll(putAllMap, "putAllCallback");
+      putAllMap.clear();
+    }
+  }
+
+  public static void putGivenKeyValue(String regionName, Map keyValues) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (Object key : keyValues.keySet()) {
+      r.put(key, keyValues.get(key));
+    }
+  }
+
+  public static void doNextPuts(String regionName, int start, int numPuts) {
+    // waitForSitesToUpdate();
+    ExpectedException exp = addExpectedException(CacheClosedException.class
+        .getName());
+    try {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = start; i < numPuts; i++) {
+        r.put(i, i);
+      }
+    }
+    finally {
+      exp.remove();
+    }
+  }
+
+  public static void validateRegionSize(String regionName, final int regionSize) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try {
+
+      final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          if (r.keySet().size() == regionSize) {
+            return true;
+          }
+          return false;
+        }
+
+        public String description() {
+          return "Expected region entries: " + regionSize
+              + " but actual entries: " + r.keySet().size()
+              + " present region keyset " + r.keySet();
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 240000, 500, true);
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+
+  /**
+   * Validate whether all the attributes set on AsyncEventQueueFactory are set
+   * on the sender underneath the AsyncEventQueue.
+   */
+  public static void validateAsyncEventQueueAttributes(String asyncChannelId,
+      int maxQueueMemory, int batchSize, int batchTimeInterval,
+      boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
+      boolean batchConflationEnabled) {
+
+    AsyncEventQueue theChannel = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+      }
+    }
+
+    GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
+    assertEquals("maxQueueMemory", maxQueueMemory,
+        theSender.getMaximumQueueMemory());
+    assertEquals("batchSize", batchSize, theSender.getBatchSize());
+    assertEquals("batchTimeInterval", batchTimeInterval,
+        theSender.getBatchTimeInterval());
+    assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
+    assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
+    assertEquals("isDiskSynchronous", isDiskSynchronous,
+        theSender.isDiskSynchronous());
+    assertEquals("batchConflation", batchConflationEnabled,
+        theSender.isBatchConflationEnabled());
+  }
+  
+  /**
+   * Validate whether all the attributes set on AsyncEventQueueFactory are set
+   * on the sender underneath the AsyncEventQueue.
+   */
+  public static void validateConcurrentAsyncEventQueueAttributes(String asyncChannelId,
+      int maxQueueMemory, int batchSize, int batchTimeInterval,
+      boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
+      boolean batchConflationEnabled, int dispatcherThreads, OrderPolicy policy) {
+
+    AsyncEventQueue theChannel = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncChannelId.equals(asyncChannel.getId())) {
+        theChannel = asyncChannel;
+      }
+    }
+
+    GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
+    assertEquals("maxQueueMemory", maxQueueMemory, theSender
+        .getMaximumQueueMemory());
+    assertEquals("batchSize", batchSize, theSender.getBatchSize());
+    assertEquals("batchTimeInterval", batchTimeInterval, theSender
+        .getBatchTimeInterval());
+    assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
+    assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
+    assertEquals("isDiskSynchronous", isDiskSynchronous, theSender
+        .isDiskSynchronous());
+    assertEquals("batchConflation", batchConflationEnabled, theSender
+        .isBatchConflationEnabled());
+    assertEquals("dispatcherThreads", dispatcherThreads, theSender
+        .getDispatcherThreads());
+    assertEquals("orderPolicy", policy, theSender.getOrderPolicy());
+  }
+
+  public static void validateAsyncEventListener(String asyncQueueId,
+      final int expectedSize) {
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap();
+    assertNotNull(eventsMap);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (eventsMap.size() == expectedSize) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected map entries: " + expectedSize
+            + " but actual entries: " + eventsMap.size();
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
+  }
+
+  public static void validateAsyncEventForOperationDetail(String asyncQueueId,
+      final int expectedSize, boolean isLoad, boolean isPutAll) {
+
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map eventsMap = ((MyAsyncEventListener_CacheLoader)theListener)
+        .getEventsMap();
+    assertNotNull(eventsMap);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (eventsMap.size() == expectedSize) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected map entries: " + expectedSize
+            + " but actual entries: " + eventsMap.size();
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
+    Collection values = eventsMap.values();
+    Iterator itr = values.iterator();
+    while (itr.hasNext()) {
+      AsyncEvent asyncEvent = (AsyncEvent)itr.next();
+      if (isLoad)
+        assertTrue(asyncEvent.getOperation().isLoad());
+      if (isPutAll)
+        assertTrue(asyncEvent.getOperation().isPutAll());
+    }
+  }
+
+  public static void validateCustomAsyncEventListener(String asyncQueueId,
+      final int expectedSize) {
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map eventsMap = ((CustomAsyncEventListener)theListener)
+        .getEventsMap();
+    assertNotNull(eventsMap);
+    WaitCriterion wc = new WaitCriterion() {
+      public boolean done() {
+        if (eventsMap.size() == expectedSize) {
+          return true;
+        }
+        return false;
+      }
+
+      public String description() {
+        return "Expected map entries: " + expectedSize
+            + " but actual entries: " + eventsMap.size();
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
+
+    Iterator<AsyncEvent> itr = eventsMap.values().iterator();
+    while (itr.hasNext()) {
+      AsyncEvent event = itr.next();
+      assertTrue("possibleDuplicate should be true for event: " + event,
+          event.getPossibleDuplicate());
+    }
+  }
+
+  public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
+    AsyncEventQueue theAsyncEventQueue = null;
+
+    Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+      if (asyncQueueId.equals(asyncChannel.getId())) {
+        theAsyncEventQueue = asyncChannel;
+      }
+    }
+
+    final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+        .getSender();
+
+    if (sender.isParallel()) {
+      final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+          .getQueues();
+
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          int size = 0;
+          for (RegionQueue q : queues) {
+            size += q.size();
+          }
+          if (size == 0) {
+            return true;
+          }
+          return false;
+        }
+
+        public String description() {
+          int size = 0;
+          for (RegionQueue q : queues) {
+            size += q.size();
+          }
+          return "Expected queue size to be : " + 0 + " but actual entries: "
+              + size;
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 60000, 500, true);
+
+    }
+    else {
+      WaitCriterion wc = new WaitCriterion() {
+        public boolean done() {
+          Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+          int size = 0;
+          for (RegionQueue q : queues) {
+            size += q.size();
+          }
+          if (size == 0) {
+            return true;
+          }
+          return false;
+        }
+
+        public String description() {
+          Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+          int size = 0;
+          for (RegionQueue q : queues) {
+            size += q.size();
+          }
+          return "Expected queue size to be : " + 0 + " but actual entries: "
+              + size;
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, 60000, 500, true);
+    }
+  }
+
+  public static void verifyAsyncEventListenerForPossibleDuplicates(
+      String asyncEventQueueId, Set<Integer> bucketIds, int batchSize) {
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncEventQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap = ((MyAsyncEventListener2)theListener)
+        .getBucketToEventsMap();
+    assertNotNull(bucketToEventsMap);
+    assertTrue(bucketIds.size() > 1);
+
+    for (int bucketId : bucketIds) {
+      List<GatewaySenderEventImpl> eventsForBucket = bucketToEventsMap
+          .get(bucketId);
+      getLogWriter().info(
+          "Events for bucket: " + bucketId + " is " + eventsForBucket);
+      assertNotNull(eventsForBucket);
+      for (int i = 0; i < batchSize; i++) {
+        GatewaySenderEventImpl senderEvent = eventsForBucket.get(i);
+        assertTrue(senderEvent.getPossibleDuplicate());
+      }
+    }
+  }
+
+  public static int getAsyncEventListenerMapSize(String asyncEventQueueId) {
+    AsyncEventListener theListener = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncEventQueueId.equals(asyncQueue.getId())) {
+        theListener = asyncQueue.getAsyncEventListener();
+      }
+    }
+
+    final Map eventsMap = ((MyAsyncEventListener)theListener).getEventsMap();
+    assertNotNull(eventsMap);
+    getLogWriter().info("The events map size is " + eventsMap.size());
+    return eventsMap.size();
+  }
+
+  public static int getAsyncEventQueueSize(String asyncEventQueueId) {
+    AsyncEventQueue theQueue = null;
+
+    Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
+    for (AsyncEventQueue asyncQueue : asyncEventQueues) {
+      if (asyncEventQueueId.equals(asyncQueue.getId())) {
+        theQueue = asyncQueue;
+      }
+    }
+    assertNotNull(theQueue);
+    return theQueue.size();
+  }
+
+  public static String getRegionFullPath(String regionName) {
+    final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    return r.getFullPath();
+  }
+
+  public static Set<Integer> getAllPrimaryBucketsOnTheNode(String regionName) {
+    PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+    return region.getDataStore().getAllLocalPrimaryBucketIds();
+  }
+
+  public static void addCacheListenerAndCloseCache(String regionName) {
+    final Region region = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(region);
+    CacheListenerAdapter cl = new CacheListenerAdapter() {
+      @Override
+      public void afterCreate(EntryEvent event) {
+        if ((Long)event.getKey() == 900) {
+          cache.getLogger().fine(" Gateway sender is killed by a test");
+          cache.close();
+          cache.getDistributedSystem().disconnect();
+        }
+      }
+    };
+    region.getAttributesMutator().addCacheListener(cl);
+  }
+
+  public static Boolean killSender(String senderId) {
+    final ExpectedException exln = addExpectedException("Could not connect");
+    ExpectedException exp = addExpectedException(CacheClosedException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(ForceReattemptException.class
+        .getName());
+    try {
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      AbstractGatewaySender sender = null;
+      for (GatewaySender s : senders) {
+        if (s.getId().equals(senderId)) {
+          sender = (AbstractGatewaySender)s;
+          break;
+        }
+      }
+      if (sender.isPrimary()) {
+        getLogWriter().info("Gateway sender is killed by a test");
+        cache.getDistributedSystem().disconnect();
+        return Boolean.TRUE;
+      }
+      return Boolean.FALSE;
+    }
+    finally {
+      exp.remove();
+      exp1.remove();
+      exln.remove();
+    }
+  }
+
+  public static Boolean killAsyncEventQueue(String asyncQueueId) {
+    Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
+    AsyncEventQueueImpl queue = null;
+    for (AsyncEventQueue q : queues) {
+      if (q.getId().equals(asyncQueueId)) {
+        queue = (AsyncEventQueueImpl)q;
+        break;
+      }
+    }
+    if (queue.isPrimary()) {
+      getLogWriter().info("AsyncEventQueue is killed by a test");
+      cache.getDistributedSystem().disconnect();
+      return Boolean.TRUE;
+    }
+    return Boolean.FALSE;
+  }
+
+  public static void killSender() {
+    getLogWriter().info("Gateway sender is going to be killed by a test");
+    cache.close();
+    cache.getDistributedSystem().disconnect();
+    getLogWriter().info("Gateway sender is killed by a test");
+  }
+
+  public static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter {
+
+    private final Set discoveredLocators = new HashSet();
+
+    private final Set removedLocators = new HashSet();
+
+    public synchronized void locatorsDiscovered(List locators) {
+      discoveredLocators.addAll(locators);
+      notifyAll();
+    }
+
+    public synchronized void locatorsRemoved(List locators) {
+      removedLocators.addAll(locators);
+      notifyAll();
+    }
+
+    public boolean waitForDiscovery(InetSocketAddress locator, long time)
+        throws InterruptedException {
+      return waitFor(discoveredLocators, locator, time);
+    }
+
+    public boolean waitForRemove(InetSocketAddress locator, long time)
+        throws InterruptedException {
+      return waitFor(removedLocators, locator, time);
+    }
+
+    private synchronized boolean waitFor(Set set, InetSocketAddress locator,
+        long time) throws InterruptedException {
+      long remaining = time;
+      long endTime = System.currentTimeMillis() + time;
+      while (!set.contains(locator) && remaining >= 0) {
+        wait(remaining);
+        remaining = endTime - System.currentTimeMillis();
+      }
+      return set.contains(locator);
+    }
+
+    public synchronized Set getDiscovered() {
+      return new HashSet(discoveredLocators);
+    }
+
+    public synchronized Set getRemoved() {
+      return new HashSet(removedLocators);
+    }
+  }
+  
+  public void tearDown2() throws Exception {
+    super.tearDown2();
+    cleanupVM();
+    vm0.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm1.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm2.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm3.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm4.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm5.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm6.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+    vm7.invoke(AsyncEventQueueTestBase.class, "cleanupVM");
+  }
+
+  public static void cleanupVM() throws IOException {
+    closeCache();
+  }
+
+  public static void closeCache() throws IOException {
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+      cache = null;
+    }
+    else {
+      AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+      if (test.isConnectedToDS()) {
+        test.getSystem().disconnect();
+      }
+    }
+  }
+
+  public static void shutdownLocator() {
+    AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+    test.getSystem().disconnect();
+  }
+
+  public static void printEventListenerMap() {
+    ((MyGatewaySenderEventListener)eventListener1).printMap();
+  }
+  
+
+  @Override
+  public InternalDistributedSystem getSystem(Properties props) {
+    // For now all WANTestBase tests allocate off-heap memory even though
+    // many of them never use it.
+    // The problem is that WANTestBase has static methods that create instances
+    // of WANTestBase (instead of instances of the subclass). So we can't override
+    // this method so that only the off-heap subclasses allocate off heap memory.
+    props.setProperty(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME, "300m");
+    return super.getSystem(props);
+  }
+  
+  /**
+   * Returns true if the test should create off-heap regions.
+   * OffHeap tests should over-ride this method and return false.
+   */
+  public boolean isOffHeap() {
+    return false;
+  }
+
+}
+
+class MyAsyncEventListener_CacheLoader implements AsyncEventListener {
+  private final Map eventsMap;
+
+  public MyAsyncEventListener_CacheLoader() {
+    this.eventsMap = new ConcurrentHashMap();
+  }
+
+  public boolean processEvents(List<AsyncEvent> events) {
+    for (AsyncEvent event : events) {
+      this.eventsMap.put(event.getKey(), event);
+    }
+    return true;
+  }
+
+  public Map getEventsMap() {
+    return eventsMap;
+  }
+
+  public void close() {
+  }
+}
+
+class MyCacheLoader implements CacheLoader, Declarable {
+
+  public Object load(LoaderHelper helper) {
+    Long key = (Long)helper.getKey();
+    return "LoadedValue" + "_" + key;
+  }
+
+  public void close() {
+  }
+
+  public void init(Properties props) {
+  }
+
+}