You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2015/12/11 22:23:15 UTC

[41/50] [abbrv] incubator-geode git commit: GEODE-637: Additional tests for AsyncEventQueues

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java
new file mode 100644
index 0000000..b050ef5
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerOffHeapDUnitTest.java
@@ -0,0 +1,17 @@
+package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
+
+
+@SuppressWarnings("serial")
+public class AsyncEventListenerOffHeapDUnitTest extends
+    AsyncEventListenerDUnitTest {
+
+  public AsyncEventListenerOffHeapDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
new file mode 100644
index 0000000..cf4a184
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventQueueStatsDUnitTest.java
@@ -0,0 +1,311 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
+
+import dunit.AsyncInvocation;
+
+public class AsyncEventQueueStatsDUnitTest extends AsyncEventQueueTestBase {
+
+  private static final long serialVersionUID = 1L;
+  
+  public AsyncEventQueueStatsDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  /**
+   * Normal replication scenario
+   */
+  public void testReplicatedSerialPropagation() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });// primary sender
+    pause(2000);//give some time for system to become stable
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln", 0, 1000, 1000, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln", 10 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln", 0, 1000, 0, 0 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln", 0 });
+  }
+  
+  /**
+   * Two listeners added to the same RR.
+   */
+  public void testAsyncStatsTwoListeners() throws Exception {
+    Integer lnPort = createFirstLocatorWithDSId(1);
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1",
+      false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1",
+      false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1",
+      false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln1",
+      false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2",
+      false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2",
+      false, 100, 100, false, false, null, false });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2",
+      false, 100, 100, false, false, null, false });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln2",
+      false, 100, 100, false, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln1,ln2", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln1", 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln2", 1000 });
+    pause(2000);//give some time for system to become stable
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln1", 0, 1000, 1000, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln1", 10 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln2", 0, 1000, 1000, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln2", 10 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln1", 0, 1000, 0, 0 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln1", 0 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln2", 0, 1000, 0, 0 });
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueBatchStats",
+        new Object[] { "ln2", 0 });
+  }
+  
+  /**
+   * HA scenario: kill one vm when puts are in progress on the other vm.
+   */
+  public void testReplicatedSerialPropagationHA() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      false, 100, 100, false, false, null, false });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    AsyncInvocation inv1 = vm5.invokeAsync(AsyncEventQueueTestBase.class, "doPuts",
+        new Object[] { testName + "_RR", 10000 });
+    pause(2000);
+    AsyncInvocation inv2 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "killAsyncEventQueue", new Object[] { "ln" });
+    Boolean isKilled = Boolean.FALSE;
+    try {
+      isKilled = (Boolean)inv2.getResult();
+    }
+    catch (Throwable e) {
+      fail("Unexpected exception while killing a AsyncEventQueue");
+    }
+    AsyncInvocation inv3 = null; 
+    if(!isKilled){
+      inv3 = vm5.invokeAsync(AsyncEventQueueTestBase.class, "killSender", new Object[] { "ln" });
+      inv3.join();
+    }
+    inv1.join();
+    inv2.join();
+    pause(2000);//give some time for system to become stable
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats_Failover", new Object[] {"ln", 10000});
+  }
+
+  /**
+   * Two regions attached to same AsyncEventQueue
+   */
+  public void testReplicatedSerialPropagationUNPorcessedEvents() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      false, 100, 100, false, false, null, false });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      false, 100, 100, false, false, null, false });
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+
+    //create another RR (RR_2) on local site
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    
+    //start puts in RR_1 in another thread
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR_1", 1000 });
+    //do puts in RR_2 in main thread
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] { testName + "_RR_2", 1000, 1500 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1500 });
+        
+    pause(2000);//give some time for system to become stable
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {"ln",
+      0, 1500, 1500, 1500});
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueUnprocessedStats", new Object[] {"ln", 0});
+    
+    
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {"ln",
+      0, 1500, 0, 0});
+    vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueUnprocessedStats", new Object[] {"ln", 1500});
+  }
+  
+  /**
+   * Test with conflation enabled
+   */
+  public void testSerialPropagationConflation() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+        false, 100, 100, true, false, null, false });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    
+    vm4
+        .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+            new Object[] { "ln" });
+    //pause at least for the batchTimeInterval to make sure that the AsyncEventQueue is actually paused
+    pause(2000);
+
+    final Map keyValues = new HashMap();
+    final Map updateKeyValues = new HashMap();
+    for(int i=0; i< 1000; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", keyValues });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() });
+    
+    for(int i=0;i<500;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    // Put the update events and check the queue size.
+    // There should be no conflation with the previous create events.
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", updateKeyValues });    
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+    
+    // Put the update events again and check the queue size.
+    // There should be conflation with the previous update events.
+    vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] { testName + "_RR", updateKeyValues });    
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });
+  
+    vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue", new Object[] { "ln" });
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 1000 });
+    
+    pause(2000);// give some time for system to become stable
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueStats", new Object[] {
+        "ln", 0, 2000, 2000, 1000 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueConflatedStats",
+        new Object[] { "ln", 500 });
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java
new file mode 100644
index 0000000..2fb7496
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueDUnitTest.java
@@ -0,0 +1,330 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
+
+import dunit.AsyncInvocation;
+
+/**
+ * @author skumar
+ *
+ */
+public class ConcurrentAsyncEventQueueDUnitTest extends AsyncEventQueueTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public ConcurrentAsyncEventQueueDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  public void testConcurrentSerialAsyncEventQueueAttributes() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 150, true, true, "testDS", true, 5, OrderPolicy.THREAD });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes",
+        new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.THREAD });
+  }
+  
+ 
+  public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyKey() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 150, true, true, "testDS", true, 5, OrderPolicy.KEY });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes",
+        new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.KEY });
+  }
+
+  public void testConcurrentParallelAsyncEventQueueAttributesOrderPolicyPartition() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 150, true, true, "testDS", true, 5, OrderPolicy.PARTITION });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateConcurrentAsyncEventQueueAttributes",
+        new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true, 5, OrderPolicy.PARTITION });
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated 
+   * WAN: Serial 
+   * Dispatcher threads: more than 1
+   * Order policy: key based ordering
+   */
+
+  public void testReplicatedSerialAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyKey() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        100 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 100 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: Replicated 
+   * WAN: Serial 
+   * Dispatcher threads: more than 1
+   * Order policy: Thread ordering
+   */
+
+  public void testReplicatedSerialAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyThread() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        false, 100, 10, true, false, null, false, 3, OrderPolicy.THREAD });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+    AsyncInvocation inv1 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        50 });
+    AsyncInvocation inv2 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR",
+      50, 100 });
+    AsyncInvocation inv3 = vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR",
+      100, 150 });
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+    } catch (InterruptedException ie) {
+      fail(
+          "Cought interrupted exception while waiting for the task tgo complete.",
+          ie);
+    }
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 150 });// primary sender
+    vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+    vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+        new Object[] { "ln", 0 });// secondary
+  }
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: PartitionedRegion 
+   * WAN: Parallel
+   * Dispatcher threads: more than 1
+   * Order policy: key based ordering
+   */
+  // Disabling test for bug #48323
+  public void testPartitionedParallelAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyKey() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+        true, 100, 10, true, false, null, false, 3, OrderPolicy.KEY });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        100 });
+    
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+      new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+      new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+      new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+      new Object[] { "ln" });
+  
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+      new Object[] { "ln"});
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+      new Object[] { "ln"});
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+      new Object[] { "ln"});
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+      new Object[] { "ln"});
+  
+    assertEquals(vm4size + vm5size + vm6size + vm7size, 100);
+  
+  }
+  
+  
+  /**
+   * Test configuration::
+   * 
+   * Region: PartitionedRegion 
+   * WAN: Parallel
+   * Dispatcher threads: more than 1
+   * Order policy: PARTITION based ordering
+   */
+  // Disabled test for bug #48323
+  public void testPartitionedParallelAsyncEventQueueWithMultipleDispatcherThreadsOrderPolicyPartition() {
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue",
+        new Object[] { "ln", true, 100, 10, true, false, null, false, 3,
+            OrderPolicy.PARTITION });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue",
+        new Object[] { "ln", true, 100, 10, true, false, null, false, 3,
+            OrderPolicy.PARTITION });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue",
+        new Object[] { "ln", true, 100, 10, true, false, null, false, 3,
+            OrderPolicy.PARTITION });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue",
+        new Object[] { "ln", true, 100, 10, true, false, null, false, 3,
+            OrderPolicy.PARTITION });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+    vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        100 });
+
+    vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+    vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+        new Object[] { "ln" });
+
+    int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+    int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class,
+        "getAsyncEventListenerMapSize", new Object[] { "ln" });
+
+    assertEquals(100, vm4size + vm5size + vm6size + vm7size);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java
new file mode 100644
index 0000000..41eb22d
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentAsyncEventQueueOffHeapDUnitTest.java
@@ -0,0 +1,16 @@
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+@SuppressWarnings("serial")
+public class ConcurrentAsyncEventQueueOffHeapDUnitTest extends
+    ConcurrentAsyncEventQueueDUnitTest {
+
+  public ConcurrentAsyncEventQueueOffHeapDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java
new file mode 100644
index 0000000..425d1a6
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueDUnitTest.java
@@ -0,0 +1,53 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
+
+/**
+ * @author skumar
+ *
+ */
+public class CommonParallelAsyncEventQueueDUnitTest extends AsyncEventQueueTestBase {
+  
+  private static final long serialVersionUID = 1L;
+
+  public CommonParallelAsyncEventQueueDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception  {
+    super.setUp();
+  }
+    
+  public void testSameSenderWithNonColocatedRegions() throws Exception {
+    addExpectedException("cannot have the same parallel async");
+    Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+      true, 100, 100, false, false, null, false });
+    vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+        new Object[] { testName + "_PR1", "ln", isOffHeap()  });
+    try {
+      vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+          new Object[] { testName + "_PR2", "ln", isOffHeap()  });
+      fail("Expected IllegateStateException : cannot have the same parallel gateway sender");
+    }
+    catch (Exception e) {
+      if (!(e.getCause() instanceof IllegalStateException)
+          || !(e.getCause().getMessage()
+              .contains("cannot have the same parallel async event queue id"))) {
+        fail("Expected IllegalStateException", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/476c6cd3/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java
new file mode 100644
index 0000000..8ab77b9
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelAsyncEventQueueOffHeapDUnitTest.java
@@ -0,0 +1,16 @@
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+@SuppressWarnings("serial")
+public class CommonParallelAsyncEventQueueOffHeapDUnitTest extends
+    CommonParallelAsyncEventQueueDUnitTest {
+
+  public CommonParallelAsyncEventQueueOffHeapDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}