You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2016/01/20 03:22:27 UTC
[21/51] [partial] incubator-geode git commit: WAN and CQ code drop
under the Pivotal SGA
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..aac9444
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
@@ -0,0 +1,855 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ *
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+
+import java.net.SocketException;
+import java.util.Set;
+
+import dunit.AsyncInvocation;
+
+/**
+ * Test the functionality of ParallelGatewaySender with multiple dispatchers.
+ * @author skumar
+ *
+ */
+public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
+
+ public ConcurrentParallelGatewaySenderDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ /**
+ * Normal happy scenario test case.
+ * checks that all the dispatchers have successfully
+ * dispatched something individually.
+ *
+ * @throws Exception
+ */
+ public void testParallelPropagationConcurrentArtifacts() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+
+ try {
+ // set the test hook to find out dispacthed events by each of the
+ // concurrent dispatcher
+ vm4.invoke(ConcurrentParallelGatewaySenderDUnitTest.class, "setTestHook",
+ new Object[] {"ln", Boolean.TRUE });
+ vm5.invoke(ConcurrentParallelGatewaySenderDUnitTest.class, "setTestHook",
+ new Object[] {"ln", Boolean.TRUE });
+ vm6.invoke(ConcurrentParallelGatewaySenderDUnitTest.class, "setTestHook",
+ new Object[] {"ln", Boolean.TRUE });
+ vm7.invoke(ConcurrentParallelGatewaySenderDUnitTest.class, "setTestHook",
+ new Object[] {"ln", Boolean.TRUE });
+
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 1000 });
+
+ // verify all buckets drained on all sender nodes.
+ vm4.invoke(WANTestBase.class,
+ "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class,
+ "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class,
+ "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class,
+ "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 1000 });
+
+ int dispatched1 = (Integer)vm4.invoke(WANTestBase.class,
+ "verifyAndGetEventsDispatchedByConcurrentDispatchers",
+ new Object[] { "ln" });
+ int dispatched2 = (Integer)vm5.invoke(WANTestBase.class,
+ "verifyAndGetEventsDispatchedByConcurrentDispatchers",
+ new Object[] { "ln" });
+ int dispatched3 = (Integer)vm6.invoke(WANTestBase.class,
+ "verifyAndGetEventsDispatchedByConcurrentDispatchers",
+ new Object[] { "ln" });
+ int dispatched4 = (Integer)vm7.invoke(WANTestBase.class,
+ "verifyAndGetEventsDispatchedByConcurrentDispatchers",
+ new Object[] { "ln" });
+
+ assertEquals(1000, dispatched1 + dispatched2 + dispatched3 + dispatched4);
+ }
+ finally {
+ vm4.invoke(ConcurrentParallelGatewaySenderDUnitTest.class, "setTestHook", new Object[] {"ln", Boolean.FALSE });
+ vm5.invoke(ConcurrentParallelGatewaySenderDUnitTest.class, "setTestHook", new Object[] {"ln", Boolean.FALSE });
+ vm6.invoke(ConcurrentParallelGatewaySenderDUnitTest.class, "setTestHook", new Object[] {"ln", Boolean.FALSE });
+ vm7.invoke(ConcurrentParallelGatewaySenderDUnitTest.class, "setTestHook", new Object[] {"ln", Boolean.FALSE });
+ }
+ }
+
+ /**
+ * Normal happy scenario test case.
+ * @throws Exception
+ */
+ public void testParallelPropagation() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 1000 });
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 1000 });
+ }
+
+
+ /**
+ * Normal happy scenario test case when bucket division tests boundary cases.
+ * @throws Exception
+ */
+ public void testParallelPropagationWithUnEqualBucketDivison() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION });
+
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 1000 });
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 1000 });
+ }
+
+
+ /**
+ * Initially the GatewayReceiver is not up but later it comes up.
+ * We verify that
+ * @throws Exception
+ */
+ public void testParallelPropagation_withoutRemoteSite() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ //keep a larger batch to minimize number of exception occurrences in the log
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 300, false, false, null, true, 6, OrderPolicy.PARTITION });
+
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm4.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", false });
+ vm5.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", false });
+ vm6.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", false });
+ vm7.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", false });
+
+ //make sure all the senders are running before doing any puts
+ vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 1000 });
+
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ // Just making sure that though the remote site is started later,
+ // remote site is still able to get the data. Since the receivers are
+ // started before creating partition region it is quite possible that the
+ // region may loose some of the events. This needs to be handled by the code
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 1000 });
+ }
+
+ /**
+ * Testing for colocated region with orderpolicy Partition
+ */
+ public void testParallelPropogationColocatedPartitionedRegions() {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION });
+
+
+ vm4.invoke(WANTestBase.class,
+ "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
+ "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class,
+ "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
+ "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class,
+ "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
+ "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class,
+ "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
+ "ln", 1, 100, isOffHeap() });
+
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class,
+ "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
+ "ln", 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class,
+ "createCustomerOrderShipmentPartitionedRegion", new Object[] { null,
+ "ln", 1, 100, isOffHeap() });
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+
+ vm4.invoke(WANTestBase.class, "putcolocatedPartitionedRegion", new Object[] { 1000 });
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ customerRegionName, 1000 });
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ orderRegionName, 1000 });
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ shipmentRegionName, 1000 });
+ }
+
+ /**
+ * Local and remote sites are up and running.
+ * Local site cache is closed and the site is built again.
+ * Puts are done to local site.
+ * Expected: Remote site should receive all the events put after the local
+ * site was built back.
+ *
+ * @throws Exception
+ */
+ public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION});
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION });
+
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+ 1000 });
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 1000 });
+ //-------------------Close and rebuild local site ---------------------------------
+
+ vm4.invoke(WANTestBase.class, "killSender", new Object[] {});
+ vm5.invoke(WANTestBase.class, "killSender", new Object[] {});
+ vm6.invoke(WANTestBase.class, "killSender", new Object[] {});
+ vm7.invoke(WANTestBase.class, "killSender", new Object[] {});
+
+ Integer regionSize =
+ (Integer) vm2.invoke(WANTestBase.class, "getRegionSize", new Object[] {testName + "_PR" });
+ getLogWriter().info("Region size on remote is: " + regionSize);
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.PARTITION });
+
+ vm4.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true });
+ vm5.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true });
+ vm6.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true });
+ vm7.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true });
+
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" });
+ //------------------------------------------------------------------------------------
+
+ addExpectedException(EntryExistsException.class.getName());
+ addExpectedException(BatchException70.class.getName());
+ addExpectedException(ServerOperationException.class.getName());
+
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 10000 });
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 10000 });
+ vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 10000 });
+ }
+
+ /**
+ * Colocated regions using ConcurrentParallelGatewaySender.
+ * Normal scenario
+ * @throws Exception
+ */
+ public void testParallelColocatedPropagation() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+
+ vm4.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, "ln", 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, null, 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, null, 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName, 1000 });
+ }
+
+
+ /**
+ * Colocated regions using ConcurrentParallelGatewaySender.
+ * Normal scenario
+ * @throws Exception
+ */
+ public void testParallelColocatedPropagationOrderPolicyPartition() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 7, OrderPolicy.PARTITION });
+
+ vm4.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, "ln", 1, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, "ln", 1, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, "ln", 1, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, "ln", 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, null, 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class, "createColocatedPartitionedRegions",
+ new Object[] { testName, null, 1, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName, 1000 });
+ }
+
+ public void testPartitionedParallelPropagationHA() throws Exception {
+ addExpectedException(SocketException.class.getName()); // for Connection reset
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
+ vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
+ vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
+ vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
+
+ vm4.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true });
+ vm5.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true });
+ vm6.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true });
+ vm7.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true });
+
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 2, 100, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 2, 100, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 2, 100, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 2, 100, isOffHeap() });
+
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+ vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 1, 100, isOffHeap() });
+
+ AsyncInvocation inv1 = vm7.invokeAsync(WANTestBase.class, "doPuts",
+ new Object[] { testName + "_PR", 5000 });
+ pause(500);
+ AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "killSender");
+ AsyncInvocation inv3 = vm6.invokeAsync(WANTestBase.class, "doPuts",
+ new Object[] { testName + "_PR", 10000 });
+ pause(1500);
+ AsyncInvocation inv4 = vm5.invokeAsync(WANTestBase.class, "killSender");
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+
+ vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 10000 });
+ vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 10000 });
+
+ //verify all buckets drained on the sender nodes that up and running.
+ vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+ vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 10000 });
+ vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+ testName + "_PR", 10000 });
+ }
+
+ public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender() {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+ vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+ vm3.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+
+ vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 0, 2, isOffHeap()});
+
+ vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 0, 2, isOffHeap()});
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 0, 2, isOffHeap()});
+
+ vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR",
+ 10 });
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+ testName + "_PR", 10 });
+ }
+
+ public void testWANPDX_PR_MultipleVM_ConcurrentParallelSender_StartedLater() {
+ Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+ "createFirstLocatorWithDSId", new Object[] { 1 });
+ Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+ "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+ vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort });
+
+ vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+ vm4.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+
+ vm3.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+ vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+
+ vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", null, 0, 2, isOffHeap()});
+
+ vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 0, 2, isOffHeap()});
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName + "_PR", "ln", 0, 2, isOffHeap()});
+
+ vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR",
+ 10 });
+
+ AsyncInvocation inv1 = vm3.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+ AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+ try{
+ inv1.join();
+ inv2.join();
+ }
+ catch(InterruptedException ie) {
+ fail("Caught interrupted exception");
+ }
+
+ vm4.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR",
+ 40 });
+
+ vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+ testName + "_PR", 40 });
+ }
+
+ public static void setTestHook(String senderId, boolean hook) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ ConcurrentParallelGatewaySenderEventProcessor cProc = (ConcurrentParallelGatewaySenderEventProcessor)((AbstractGatewaySender)sender)
+ .getEventProcessor();
+ if (cProc == null) return;
+ cProc.TEST_HOOK = hook;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
new file mode 100644
index 0000000..e49216b
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOffHeapDUnitTest.java
@@ -0,0 +1,16 @@
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+@SuppressWarnings("serial")
+public class ConcurrentParallelGatewaySenderOffHeapDUnitTest extends
+ ConcurrentParallelGatewaySenderDUnitTest {
+
+ public ConcurrentParallelGatewaySenderOffHeapDUnitTest(String name) {
+ super(name);
+ }
+
+ @Override
+ public boolean isOffHeap() {
+ return true;
+ }
+
+}