You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/12/08 00:22:40 UTC
[2/3] incubator-geode git commit: GEODE-637: Additional tests for
AsyncEventQueues
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/eccf5239/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
new file mode 100644
index 0000000..1eafbb0
--- /dev/null
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java
@@ -0,0 +1,1911 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.asyncqueue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Ignore;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.AsyncEventQueueTestBase;
+
+public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ public AsyncEventListenerDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ /**
+ * Test to verify that AsyncEventQueue can not be created when null listener
+ * is passed.
+ */
+ public void testCreateAsyncEventQueueWithNullListener() {
+ AsyncEventQueueTestBase test = new AsyncEventQueueTestBase(testName);
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+
+ AsyncEventQueueFactory asyncQueueFactory = cache
+ .createAsyncEventQueueFactory();
+ try {
+ asyncQueueFactory.create("testId", null);
+ fail("AsyncQueueFactory should not allow to create AsyncEventQueue with null listener");
+ }
+ catch (IllegalArgumentException e) {
+ // expected
+ }
+
+ }
+
+ public void testSerialAsyncEventQueueAttributes() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 150, true, true, "testDS", true });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventQueueAttributes",
+ new Object[] { "ln", 100, 150, AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL, true, "testDS", true, true });
+ }
+
+ public void testSerialAsyncEventQueueSize() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ pause(1000);// pause at least for the batchTimeInterval
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventQueueSize", new Object[] { "ln" });
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventQueueSize", new Object[] { "ln" });
+ assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
+ assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
+ }
+
+ /**
+ * Added to reproduce defect #50366:
+ * NullPointerException with AsyncEventQueue#size() when number of dispatchers is more than 1
+ */
+ public void testConcurrentSerialAsyncEventQueueSize() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+
+ pause(1000);// pause at least for the batchTimeInterval
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventQueueSize", new Object[] { "ln" });
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventQueueSize", new Object[] { "ln" });
+ assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
+ assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: false
+ */
+
+ public void testReplicatedSerialAsyncEventQueue() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ }
+
+ /**
+ * Verify that the events loaded by CacheLoader reach the AsyncEventListener
+ * with correct operation detail (added for defect #50237).
+ */
+ public void testReplicatedSerialAsyncEventQueueWithCacheLoader() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithCacheLoaderAndAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doGets", new Object[] { testName + "_RR",
+ 10 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+ new Object[] { "ln", 10, true, false });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+ new Object[] { "ln", 0, true, false });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+ new Object[] { "ln", 0, true, false });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+ new Object[] { "ln", 0, true, false });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated
+ * WAN: Serial
+ * Region persistence enabled: false
+ * Async queue persistence enabled: false
+ *
+ * Error is thrown from AsyncEventListener implementation while processing the batch.
+ * Added to test the fix done for defect #45152.
+ */
+
+ public void testReplicatedSerialAsyncEventQueue_ExceptionScenario() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln",
+ false, 100, 100, false, false, null, false, 1 });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln",
+ false, 100, 100, false, false, null, false, 1 });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln",
+ false, 100, 100, false, false, null, false, 1 });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithCustomListener", new Object[] { "ln",
+ false, 100, 100, false, false, null, false, 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ pause(2000);// pause at least for the batchTimeInterval
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 100 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener",
+ new Object[] { "ln", 100 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateCustomAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: false AsyncEventQueue conflation enabled: true
+ */
+ public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ pause(1000);// pause at least for the batchTimeInterval
+
+ final Map keyValues = new HashMap();
+ final Map updateKeyValues = new HashMap();
+ for (int i = 0; i < 1000; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_RR", keyValues });
+
+ pause(1000);
+ vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() });
+
+ for (int i = 0; i < 500; i++) {
+ updateKeyValues.put(i, i + "_updated");
+ }
+
+ // Put the update events and check the queue size.
+ // There should be no conflation with the previous create events.
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_RR", updateKeyValues });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() + updateKeyValues.size() });
+
+ // Put the update events again and check the queue size.
+ // There should be conflation with the previous update events.
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_RR", updateKeyValues });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() + updateKeyValues.size() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ }
+
+
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated WAN: Serial Region persistence enabled: false Async
+ * event queue persistence enabled: false
+ *
+ * Note: The test doesn't create a locator but uses MCAST port instead.
+ */
+ @Ignore("Disabled until I can sort out the hydra dependencies - see bug 52214")
+ public void DISABLED_testReplicatedSerialAsyncEventQueueWithoutLocator() {
+ int mPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator",
+ new Object[] { mPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator",
+ new Object[] { mPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator",
+ new Object[] { mPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCacheWithoutLocator",
+ new Object[] { mPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: true
+ *
+ * No VM is restarted.
+ */
+
+ public void testReplicatedSerialAsyncEventQueueWithPeristenceEnabled() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: true
+ *
+ * There is only one vm in the site and that vm is restarted
+ */
+
+ @Ignore("Disabled for 52351")
+ public void DISABLED_testReplicatedSerialAsyncEventQueueWithPeristenceEnabled_Restart() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ String firstDStore = (String)vm4.invoke(AsyncEventQueueTestBase.class,
+ "createAsyncEventQueueWithDiskStore", new Object[] { "ln", false, 100,
+ 100, true, null });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ // pause async channel and then do the puts
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+
+ // ------------------ KILL VM4 AND REBUILD
+ // ------------------------------------------
+ vm4.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {});
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+ new Object[] { "ln", false, 100, 100, true, firstDStore });
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ // -----------------------------------------------------------------------------------
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: true
+ *
+ * There are 3 VMs in the site and the VM with primary sender is shut down.
+ */
+ @Ignore("Disabled for 52351")
+ public void DISABLED_testReplicatedSerialAsyncEventQueueWithPeristenceEnabled_Restart2() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+ new Object[] { "ln", false, 100, 100, true, null });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+ new Object[] { "ln", false, 100, 100, true, null });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+ new Object[] { "ln", false, 100, 100, true, null });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm4.invoke(AsyncEventQueueTestBase.class, "addCacheListenerAndCloseCache",
+ new Object[] { testName + "_RR" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm5.invoke(AsyncEventQueueTestBase.class, "doPuts",
+ new Object[] { testName + "_RR", 2000 });
+
+ // -----------------------------------------------------------------------------------
+ vm5.invoke(AsyncEventQueueTestBase.class, "waitForSenderToBecomePrimary",
+ new Object[] { AsyncEventQueueImpl
+ .getSenderIdFromAsyncEventQueueId("ln") });
+
+ vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventListenerMapSize", new Object[] { "ln" });
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventListenerMapSize", new Object[] { "ln" });
+
+ getLogWriter().info("vm4 size is: " + vm4size);
+ getLogWriter().info("vm5 size is: " + vm5size);
+ // verify that there is no event loss
+ assertTrue(
+ "Total number of entries in events map on vm4 and vm5 should be at least 2000",
+ (vm4size + vm5size) >= 2000);
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated
+ * WAN: Serial
+ * Dispatcher threads: more than 1
+ * Order policy: key based ordering
+ */
+ public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Replicated
+ * WAN: Serial
+ * Region persistence enabled: false
+ * Async queue persistence enabled: false
+ */
+ public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion_2() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 500 });
+ vm4.invokeAsync(AsyncEventQueueTestBase.class, "doNextPuts", new Object[] { testName + "_RR",
+ 500, 1000 });
+ vm4.invokeAsync(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000, 1500 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ }
+
+ /**
+ * Dispatcher threads set to more than 1 but no order policy set.
+ * Added for defect #50514.
+ */
+ public void testConcurrentSerialAsyncEventQueueWithoutOrderPolicy() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, null });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, null });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, null });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false, 3, null });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] {"ln", 0 });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Partitioned WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: false
+ */
+ public void testPartitionedSerialAsyncEventQueue() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 500 });
+ vm5.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] {
+ testName + "_PR", 500, 1000 });
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Partitioned WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: false AsyncEventQueue conflation enabled: true
+ */
+ public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, true, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+
+ pause(2000);
+
+ final Map keyValues = new HashMap();
+ final Map updateKeyValues = new HashMap();
+ for (int i = 0; i < 1000; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", keyValues });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() });
+
+ for (int i = 0; i < 500; i++) {
+ updateKeyValues.put(i, i + "_updated");
+ }
+
+ // Put the update events and check the queue size.
+ // There should be no conflation with the previous create events.
+ vm5.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", updateKeyValues });
+
+ vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() + updateKeyValues.size() });
+
+ // Put the update events again and check the queue size.
+ // There should be conflation with the previous update events.
+ vm5.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", updateKeyValues });
+
+ vm5.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() + updateKeyValues.size() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Partitioned WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: true
+ *
+ * No VM is restarted.
+ */
+ public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, true, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, true, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, true, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ false, 100, 100, false, true, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 500 });
+ vm5.invoke(AsyncEventQueueTestBase.class, "doPutsFrom", new Object[] {
+ testName + "_PR", 500, 1000 });
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });// secondary
+ }
+
+ /**
+ * Test configuration::
+ *
+ * Region: Partitioned WAN: Serial Region persistence enabled: false Async
+ * channel persistence enabled: true
+ *
+ * There is only one vm in the site and that vm is restarted
+ */
+ public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled_Restart() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ String firstDStore = (String)vm4.invoke(AsyncEventQueueTestBase.class,
+ "createAsyncEventQueueWithDiskStore", new Object[] { "ln", false, 100,
+ 100, true, null });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ // pause async channel and then do the puts
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueueAndWaitForDispatcherToPause",
+ new Object[] { "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 1000 });
+
+ // ------------------ KILL VM4 AND REBUILD
+ // ------------------------------------------
+ vm4.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {});
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithDiskStore",
+ new Object[] { "ln", false, 100, 100, true, firstDStore });
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ // -----------------------------------------------------------------------------------
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 1000 });// primary sender
+ }
+
+ public void testParallelAsyncEventQueueWithReplicatedRegion() {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] {
+ "ln", true, 100, 100, true, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] {
+ "ln", true, 100, 100, true, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] {
+ "ln", true, 100, 100, true, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] {
+ "ln", true, 100, 100, true, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class,
+ "createReplicatedRegionWithAsyncEventQueue", new Object[] {
+ testName + "_RR", "ln", isOffHeap() });
+ fail("Expected GatewaySenderConfigException where parallel async event queue can not be used with replicated region");
+ }
+ catch (Exception e) {
+ if (!e.getCause().getMessage()
+ .contains("can not be used with replicated region")) {
+ fail("Expected GatewaySenderConfigException where parallel async event queue can not be used with replicated region");
+ }
+ }
+ }
+
+ public void testParallelAsyncEventQueue() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 256 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+
+ assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
+ }
+
+ /**
+ * Verify that the events reaching the AsyncEventListener have correct operation detail.
+ * (added for defect #50237).
+ */
+ public void testParallelAsyncEventQueueWithCacheLoader() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue",
+ new Object[] { testName + "_PR", "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue",
+ new Object[] { testName + "_PR", "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue",
+ new Object[] { testName + "_PR", "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithCacheLoaderAndAsyncQueue",
+ new Object[] { testName + "_PR", "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPutAll", new Object[] { testName + "_PR",
+ 100, 10 });
+ vm4.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+ new Object[] { "ln", 250, false, true });
+ vm5.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+ new Object[] { "ln", 250, false, true });
+ vm6.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+ new Object[] { "ln", 250, false, true });
+ vm7.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventForOperationDetail",
+ new Object[] { "ln", 250, false, true });
+ }
+
+ public void testParallelAsyncEventQueueSize() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ pause(1000);// pause at least for the batchTimeInterval
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 1000 });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventQueueSize", new Object[] { "ln" });
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventQueueSize", new Object[] { "ln" });
+
+ assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
+ assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
+ }
+
+ /**
+ * Added to reproduce defect #50366:
+ * NullPointerException with AsyncEventQueue#size() when number of dispatchers is more than 1
+ */
+ public void testConcurrentParallelAsyncEventQueueSize() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createConcurrentAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ pause(1000);// pause at least for the batchTimeInterval
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 1000 });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventQueueSize", new Object[] { "ln" });
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventQueueSize", new Object[] { "ln" });
+
+ assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size);
+ assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size);
+ }
+
+ public void testParallelAsyncEventQueueWithConflationEnabled() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, true, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, true, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, true, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, true, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+
+ pause(2000);// pause for the batchTimeInterval to ensure that all the
+ // senders are paused
+
+ final Map keyValues = new HashMap();
+ final Map updateKeyValues = new HashMap();
+ for (int i = 0; i < 1000; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", keyValues });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() });
+
+ for (int i = 0; i < 500; i++) {
+ updateKeyValues.put(i, i + "_updated");
+ }
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", updateKeyValues });
+
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() + updateKeyValues.size() }); // no conflation of creates
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", updateKeyValues });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() + updateKeyValues.size() }); // conflation of updates
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+
+ assertEquals(vm4size + vm5size + vm6size + vm7size, keyValues.size());
+ }
+
+ /**
+ * Added to reproduce defect #47213
+ */
+ public void testParallelAsyncEventQueueWithConflationEnabled_bug47213() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, true, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, true, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, true, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, true, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPRWithRedundantCopyWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+
+ pause(2000);// pause for the batchTimeInterval to ensure that all the
+ // senders are paused
+
+ final Map keyValues = new HashMap();
+ final Map updateKeyValues = new HashMap();
+ for (int i = 0; i < 1000; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", keyValues });
+
+ pause(2000);
+ vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() });
+
+ for (int i = 0; i < 500; i++) {
+ updateKeyValues.put(i, i + "_updated");
+ }
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", updateKeyValues });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "putGivenKeyValue", new Object[] {
+ testName + "_PR", updateKeyValues });
+
+ // pause to ensure that events have been conflated.
+ pause(2000);
+ vm4.invoke(AsyncEventQueueTestBase.class, "checkAsyncEventQueueSize", new Object[] {
+ "ln", keyValues.size() + updateKeyValues.size() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+
+ assertEquals(vm4size + vm5size + vm6size + vm7size, keyValues.size());
+
+ }
+
+ public void testParallelAsyncEventQueueWithOneAccessor() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm3.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm3.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+
+ vm3.invoke(AsyncEventQueueTestBase.class,
+ "createPartitionedRegionAccessorWithAsyncEventQueue", new Object[] {
+ testName + "_PR", "ln" });
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm3.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 256 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+
+ vm3.invoke(AsyncEventQueueTestBase.class, "validateAsyncEventListener",
+ new Object[] { "ln", 0 });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+
+ assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
+
+ }
+
+ public void testParallelAsyncEventQueueWithPersistence() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, true, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, true, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, true, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, true, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createPartitionedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_PR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 256 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+ int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class, "getAsyncEventListenerMapSize",
+ new Object[] { "ln"});
+
+ assertEquals(vm4size + vm5size + vm6size + vm7size, 256);
+ }
+
+ /**
+ * Below test is disabled intentionally Replicated region with Parallel Async
+ * Event queue is not supported. Test is added for the same
+ * testParallelAsyncEventQueueWithReplicatedRegion
+ *
+ * We are gone support this configuration in upcoming releases
+ */
+
+ public void DISABLED_DUETO_BUG51491_testReplicatedParallelAsyncEventQueue() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueue", new Object[] { "ln",
+ true, 100, 100, false, false, null, false });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm6.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+ vm7.invoke(AsyncEventQueueTestBase.class, "createReplicatedRegionWithAsyncEventQueue",
+ new Object[] { testName + "_RR", "ln", isOffHeap() });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts", new Object[] { testName + "_RR",
+ 1000 });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm5.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm6.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+ vm7.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] { "ln" });
+
+ int vm4size = (Integer)vm4.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventListenerMapSize", new Object[] { "ln" });
+ int vm5size = (Integer)vm5.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventListenerMapSize", new Object[] { "ln" });
+ int vm6size = (Integer)vm6.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventListenerMapSize", new Object[] { "ln" });
+ int vm7size = (Integer)vm7.invoke(AsyncEventQueueTestBase.class,
+ "getAsyncEventListenerMapSize", new Object[] { "ln" });
+
+ assertEquals(vm4size + vm5size + vm6size + vm7size, 1000);
+ }
+
+/**
+ * Test case to test possibleDuplicates. vm4 & vm5 are hosting the PR. vm5 is
+ * killed so the buckets hosted by it are shifted to vm4.
+ */
+ @Ignore("Disabled for 52349")
+ public void DISABLED_testParallelAsyncEventQueueHA_Scenario1() {
+ Integer lnPort = (Integer)vm0.invoke(AsyncEventQueueTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ vm4.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createCache", new Object[] { lnPort });
+
+ getLogWriter().info("Created the cache");
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithListener2",
+ new Object[] { "ln", true, 100, 5, false, null });
+ vm5.invoke(AsyncEventQueueTestBase.class, "createAsyncEventQueueWithListener2",
+ new Object[] { "ln", true, 100, 5, false, null });
+
+ getLogWriter().info("Created the AsyncEventQueue");
+
+ vm4.invoke(AsyncEventQueueTestBase.class,
+ "createPRWithRedundantCopyWithAsyncEventQueue", new Object[] {
+ testName + "_PR", "ln", isOffHeap() });
+ vm5.invoke(AsyncEventQueueTestBase.class,
+ "createPRWithRedundantCopyWithAsyncEventQueue", new Object[] {
+ testName + "_PR", "ln", isOffHeap() });
+
+ getLogWriter().info("Created PR with AsyncEventQueue");
+
+ vm4
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ vm5
+ .invoke(AsyncEventQueueTestBase.class, "pauseAsyncEventQueue",
+ new Object[] { "ln" });
+ pause(1000);// pause for the batchTimeInterval to make sure the AsyncQueue
+ // is paused
+
+ getLogWriter().info("Paused the AsyncEventQueue");
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "doPuts",
+ new Object[] { testName + "_PR", 80 });
+
+ getLogWriter().info("Done puts");
+
+ Set<Integer> primaryBucketsVm5 = (Set<Integer>)vm5.invoke(
+ AsyncEventQueueTestBase.class, "getAllPrimaryBucketsOnTheNode",
+ new Object[] { testName + "_PR" });
+
+ getLogWriter().info("Primary buckets on vm5: " + primaryBucketsVm5);
+ // ---------------------------- Kill vm5 --------------------------
+ vm5.invoke(AsyncEventQueueTestBase.class, "killSender", new Object[] {});
+
+ pause(1000);// give some time for rebalancing to happen
+ vm4.invoke(AsyncEventQueueTestBase.class, "resumeAsyncEventQueue",
+ new Object[] { "ln" });
+
+ vm4.invoke(AsyncEventQueueTestBase.class, "waitForAsyncQueueToGetEmpty",
+ new Object[] {
<TRUNCATED>