You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:57 UTC
[09/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
new file mode 100644
index 0000000..427054a
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+@Category(DistributedTest.class)
+public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ public ParallelWANPropagationConcurrentOpsDUnitTest() {
+ super();
+ }
+
+ /**
+ * Normal propagation scenario test case for a PR with only one bucket.
+ * This has been added for bug# 44284.
+ * @throws Exception
+ */
+ @Test
+ public void testParallelPropagationWithSingleBucketPR() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 1, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 1, isOffHeap() ));
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ //pause the senders
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ Wait.pause(5000);
+
+ AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 ));
+ AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+ AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 ));
+ AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+
+ async1.join();
+ async2.join();
+ async3.join();
+ async4.join();
+
+ int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
+ assertEquals("Actual queue size is not matching with the expected", 3500, queueSize);
+
+ //resume the senders now
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ /**
+ * Normal propagation scenario test case for a PR with less number of buckets.
+ * Buckets have been kept to 10 for this test.
+ * This has been added for bug# 44287.
+ * @throws Exception
+ */
+ @Test
+ public void testParallelPropagationWithLowNumberOfBuckets() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 10, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 10, isOffHeap() ));
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 ));
+ AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+ AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 ));
+ AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+
+ async1.join();
+ async2.join();
+ async3.join();
+ async4.join();
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ @Test
+ public void testParallelQueueDrainInOrder_PR() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 3, 4, isOffHeap() ));
+
+ vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.addQueueListener( "ln", true));
+ vm5.invoke(() -> WANTestBase.addQueueListener( "ln", true));
+ vm6.invoke(() -> WANTestBase.addQueueListener( "ln", true));
+ vm7.invoke(() -> WANTestBase.addQueueListener( "ln", true));
+
+ Wait.pause(2000);
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln"));
+ vm6.invoke(() -> WANTestBase.pauseSender( "ln"));
+ vm7.invoke(() -> WANTestBase.pauseSender( "ln"));
+
+ Wait.pause(2000);
+
+ vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 4 ));
+ vm4.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
+ vm5.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
+ vm6.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4 ));
+ vm7.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
+
+ vm4.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
+ vm5.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
+ vm6.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4 ));
+ vm7.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
+
+ vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+
+ vm6.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+
+ HashMap vm4BRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
+ HashMap vm5BRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
+ HashMap vm6BRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
+ HashMap vm7BRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
+
+ List b0SenderUpdates = (List)vm4BRUpdates.get("Create0");
+ List b1SenderUpdates = (List)vm4BRUpdates.get("Create1");
+ List b2SenderUpdates = (List)vm4BRUpdates.get("Create2");
+ List b3SenderUpdates = (List)vm4BRUpdates.get("Create3");
+
+ HashMap vm4QueueBRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
+ HashMap vm5QueueBRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
+ HashMap vm6QueueBRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
+ HashMap vm7QueueBRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
+
+ assertEquals(vm4QueueBRUpdates, vm5QueueBRUpdates);
+ assertEquals(vm4QueueBRUpdates, vm6QueueBRUpdates);
+ assertEquals(vm4QueueBRUpdates, vm7QueueBRUpdates);
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+ vm5.invoke(() -> WANTestBase.resumeSender( "ln"));
+ vm6.invoke(() -> WANTestBase.resumeSender( "ln"));
+ vm7.invoke(() -> WANTestBase.resumeSender( "ln"));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkPR(
+ getTestMethodName() + "_PR"));
+ List<Long> createList = (List)receiverUpdates.get("Create");
+ ArrayList<Long> b0ReceiverUpdates = new ArrayList<Long>();
+ ArrayList<Long> b1ReceiverUpdates = new ArrayList<Long>();
+ ArrayList<Long> b2ReceiverUpdates = new ArrayList<Long>();
+ ArrayList<Long> b3ReceiverUpdates = new ArrayList<Long>();
+ for (Long key : createList) {
+ long mod = key % 4;
+ if (mod == 0) {
+ b0ReceiverUpdates.add(key);
+ }
+ else if (mod == 1) {
+ b1ReceiverUpdates.add(key);
+ }
+ else if (mod == 2) {
+ b2ReceiverUpdates.add(key);
+ }
+ else if (mod == 3) {
+ b3ReceiverUpdates.add(key);
+ }
+ }
+ b0ReceiverUpdates.remove(0);
+ b1ReceiverUpdates.remove(0);
+ b2ReceiverUpdates.remove(0);
+ b3ReceiverUpdates.remove(0);
+
+ assertEquals(b0SenderUpdates, b0ReceiverUpdates);
+ assertEquals(b1SenderUpdates, b1ReceiverUpdates);
+ assertEquals(b2SenderUpdates, b2ReceiverUpdates);
+ assertEquals(b3SenderUpdates, b3ReceiverUpdates);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
new file mode 100644
index 0000000..7b0cf3e
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -0,0 +1,1234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.Set;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class ParallelWANPropagationDUnitTest extends WANTestBase {
+ private static final long serialVersionUID = 1L;
+
+ public ParallelWANPropagationDUnitTest() {
+ super();
+ }
+
+ @Test
+ public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCache(lnPort);
+ createSender("ln", 2, true, 100, 300, false, false,
+ null, true);
+ createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap());
+
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals("ln")) {
+ sender = s;
+ break;
+ }
+ }
+ try {
+ sender.start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Failed with IOException");
+ }
+
+ GemFireCacheImpl gemCache = (GemFireCacheImpl)cache;
+ Set regionSet = gemCache.rootRegions();
+
+ for (Object r : regionSet) {
+ if (((Region)r).getName().equals(
+ ((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0].getRegion().getName())) {
+ fail("The shadowPR is exposed to the user");
+ }
+ }
+ }
+
+ @Test
+ public void testParallelPropagation_withoutRemoteSite() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //keep a larger batch to minimize number of exception occurrences in the log
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 300, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 300, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 300, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 300, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //make sure all the senders are running before doing any puts
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+ createReceiverInVMs(vm2, vm3);
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ // Just making sure that though the remote site is started later,
+ // remote site is still able to get the data. Since the receivers are
+ // started before creating partition region it is quite possible that the
+ // region may loose some of the events. This needs to be handled by the code
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ /**
+ * Normal happy scenario test case.
+ * @throws Exception
+ */
+ @Test
+ public void testParallelPropagation() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ protected SerializableRunnableIF createReceiverPartitionedRegionRedundancy1() {
+ return () -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() );
+ }
+
+ protected SerializableRunnableIF createPartitionedRegionRedundancy1Runnable() {
+ return () -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() );
+ }
+
+ protected SerializableRunnableIF waitForSenderRunnable() {
+ return () -> WANTestBase.waitForSenderRunningState( "ln" );
+ }
+
+ @Test
+ public void testParallelPropagation_ManualStart() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, false ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, false ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, false ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, false ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ /**
+ * Normal happy scenario test case2.
+ * @throws Exception
+ */
+ @Test
+ public void testParallelPropagationPutBeforeSenderStart() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ /**
+ * Local and remote sites are up and running.
+ * Local site cache is closed and the site is built again.
+ * Puts are done to local site.
+ * Expected: Remote site should receive all the events put after the local
+ * site was built back.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ //-------------------Close and rebuild local site ---------------------------------
+
+ vm4.invoke(() -> WANTestBase.killSender());
+ vm5.invoke(() -> WANTestBase.killSender());
+ vm6.invoke(() -> WANTestBase.killSender());
+ vm7.invoke(() -> WANTestBase.killSender());
+
+ Integer regionSize =
+ (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" ));
+ LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+ //------------------------------------------------------------------------------------
+
+ IgnoredException.addIgnoredException(EntryExistsException.class.getName());
+ IgnoredException.addIgnoredException(BatchException70.class.getName());
+ IgnoredException.addIgnoredException(ServerOperationException.class.getName());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ @Test
+ public void testParallelColocatedPropagation() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ }
+ /**
+ * Create colocated partitioned regions.
+ * Parent region has PGS attached and child region doesn't.
+ *
+ * Validate that events for parent region reaches remote site.
+ *
+ * @throws Exception
+ */
+
+ @Test
+ public void testParallelColocatedPropagation2() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child1", 1000 ));
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child2", 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName()+"_child1", 0 ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName()+"_child2", 0 ));
+ }
+
+
+ @Test
+ public void testParallelPropagationWithOverflow() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //let all the senders start before doing any puts to ensure that none of the events is lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), 150 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 150 ));
+ }
+
+ @Test
+ public void testSerialReplicatedAndParallelPartitionedPropagation()
+ throws Exception {
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "lnSerial",
+ 2, false, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "lnSerial",
+ 2, false, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "lnParallel",
+ 2, true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "lnParallel",
+ 2, true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "lnParallel",
+ 2, true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "lnParallel",
+ 2, true, 100, 10, false, false, null, true ));
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("lnSerial", vm4, vm5);
+
+ startSenderInVMs("lnParallel", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+ vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ @Test
+ public void testPartitionedParallelPropagationToTwoWanSites()
+ throws Exception {
+ Integer lnPort = createFirstLocatorWithDSId(1);
+ Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ createReceiverInVMs(vm2);
+
+ createCacheInVMs(tkPort, vm3);
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+ createReceiverInVMs(vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "lnParallel1",
+ 2, true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "lnParallel1",
+ 2, true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "lnParallel1",
+ 2, true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "lnParallel1",
+ 2, true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "lnParallel2",
+ 3, true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "lnParallel2",
+ 3, true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "lnParallel2",
+ 3, true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "lnParallel2",
+ 3, true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("lnParallel1", vm4, vm5, vm6, vm7);
+
+ startSenderInVMs("lnParallel2", vm4, vm5, vm6, vm7);
+
+
+ //before doing puts, make sure that the senders are started.
+ //this will ensure that not a single events is lost
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
+
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
+
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ @Category(FlakyTest.class) // GEODE-1008 and GEODE-1180: random ports, async actions, thread sleeps, time sensitive, waitForCriterion
+ @Test
+ public void testPartitionedParallelPropagationHA() throws Exception {
+ IgnoredException.addIgnoredException("Broken pipe");
+ IgnoredException.addIgnoredException("Connection reset");
+ IgnoredException.addIgnoredException("Unexpected IOException");
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 ));
+ Wait.pause(500);
+ AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 ));
+ Wait.pause(1500);
+ AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+
+ vm6.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+ vm7.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+
+ //verify all buckets drained on the sender nodes that up and running.
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+ }
+
+ @Test
+ public void testParallelPropagationWithFilter() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false,
+ new MyGatewayEventFilter(), true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false,
+ new MyGatewayEventFilter(), true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false,
+ new MyGatewayEventFilter(), true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false,
+ new MyGatewayEventFilter(), true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //wait for senders to be running before doing any puts. This will ensure that
+ //not a single events is lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 800 ));
+ }
+
+
+ @Test
+ public void testParallelPropagationWithPutAll() throws Exception {
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPutAll( getTestMethodName() + "_PR",
+ 100 , 50 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 5000 ));
+
+ }
+
+ /**
+ * There was a bug that all destroy events were being put into different buckets of sender queue
+ * against the key 0. Bug# 44304
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testParallelPropagationWithDestroy() throws Exception {
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 100, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 100, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 100, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 100, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ Wait.pause(2000);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+ vm4.invoke(() -> WANTestBase.doDestroys( getTestMethodName() + "_PR", 500 ));
+
+
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+ //give some time for the queue to drain
+ Wait.pause(5000);
+
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 500 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 500 ));
+
+ }
+
+ /**
+ * Normal happy scenario test case. But with Tx operations
+ * @throws Exception
+ */
+ @Test
+ public void testParallelPropagationTxOperations() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+ createCacheInVMs(lnPort, vm4, vm5);
+ //vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+ //vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ //vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ // true, 100, 10, false, false, null, true ));
+ //vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ // true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+// vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+// testName + "_PR", "ln", true, 1, 100, isOffHeap() ));
+// vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+// testName + "_PR", "ln", true, 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+// vm6.invoke(() -> WANTestBase.startSender( "ln" ));
+// vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+ vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+// vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+// vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm4.invoke(() -> WANTestBase.doTxPuts( getTestMethodName() + "_PR"));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+// vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+// vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 3 ));
+ }
+
+ @Ignore
+ @Test
+ public void testParallelGatewaySenderQueueLocalSize() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ /*
+ * Remember pausing sender does not guarantee that peek will be paused
+ * immediately as its quite possible event processor is already in peeking
+ * events and send them after peeking without a check for pause. hence below
+ * pause of 1 sec to allow dispatching to be paused
+ */
+// vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
+// vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
+ Wait.pause(1000);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 10 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+
+ // instead of checking size as 5 and 5. check that combined size is 10
+ Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
+ Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
+ assertEquals(10, localSize1 + localSize2);
+ }
+
+
+
+ public void tParallelGatewaySenderQueueLocalSizeWithHA() {
+ IgnoredException.addIgnoredException("Broken pipe");
+ IgnoredException.addIgnoredException("Connection reset");
+ IgnoredException.addIgnoredException("Unexpected IOException");
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+ vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ /*
+ * Remember pausing sender does not guarantee that peek will be paused
+ * immediately as its quite possible event processor is already in peeking
+ * events and send them after peeking without a check for pause. hence below
+ * pause of 1 sec to allow dispatching to be paused
+ */
+// vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
+// vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
+ Wait.pause(1000);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 10 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+
+ Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
+ Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
+ assertEquals(10, localSize1 + localSize2);
+
+ vm5.invoke(() -> WANTestBase.killSender( ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+ vm4.invoke(() -> WANTestBase.checkPRQLocalSize( "ln", 10 ));
+
+ }
+
+ /**
+ * Added for defect #50364 Can't colocate region that has AEQ with a region that does not have that same AEQ
+ */
+ @Test
+ public void testParallelSenderAttachedToChildRegionButNotToParentRegion() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create cache and receiver on site2
+ createCacheInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
+ //create cache on site1
+ createCacheInVMs(lnPort, vm3);
+
+ //create sender on site1
+ vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ //start sender on site1
+ startSenderInVMs("ln", vm3);
+
+ //create leader (parent) PR on site1
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() ));
+ String parentRegionFullPath =
+ (String) vm3.invoke(() -> WANTestBase.getRegionFullPath( getTestMethodName() + "PARENT_PR"));
+
+ //create colocated (child) PR on site1
+ vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegion(
+ getTestMethodName() + "CHILD_PR", "ln", 0, 100, parentRegionFullPath ));
+
+ //create leader and colocated PR on site2
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegion(
+ getTestMethodName() + "CHILD_PR", null, 0, 100, parentRegionFullPath ));
+
+ //do puts in colocated (child) PR on site1
+ vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "CHILD_PR", 1000 ));
+
+ //verify the puts reach site2
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "CHILD_PR", 1000 ));
+ }
+
+ @Test
+ public void testParallelPropagationWithFilter_AfterAck() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm6, vm7);
+ createReceiverInVMs(vm6, vm7);
+
+ createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
+
+ vm2.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
+ vm3.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm2, vm3, vm4, vm5);
+
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ // wait for senders to be running before doing any puts. This will ensure
+ // that
+ // not a single events is lost
+ vm2.invoke(waitForSenderRunnable());
+ vm3.invoke(waitForSenderRunnable());
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateQueueContents( "ln",
+ 0 ));
+ vm3.invoke(() -> WANTestBase.validateQueueContents( "ln",
+ 0 ));
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
+ 0 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents( "ln",
+ 0 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+
+ vm6.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm7.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+
+ Integer vm2Acks = (Integer)vm2.invoke(() -> WANTestBase.validateAfterAck( "ln"));
+ Integer vm3Acks = (Integer)vm3.invoke(() -> WANTestBase.validateAfterAck( "ln"));
+ Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( "ln"));
+ Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( "ln"));
+
+ assertEquals(2000, (vm2Acks + vm3Acks + vm4Acks + vm5Acks));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
new file mode 100644
index 0000000..07a6223
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+@Category(DistributedTest.class)
+public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ public ParallelWANPropagationLoopBackDUnitTest() {
+ super();
+ }
+
+ /**
+ * Test loop back issue between 2 WAN sites (LN & NY). LN -> NY -> LN.
+ * Site1 (LN): vm2, vm4, vm5
+ * Site2 (NY): vm3, vm6, vm7
+ */
+ @Test
+ public void testParallelPropagationLoopBack() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on site1 and site2
+ createCacheInVMs(lnPort, vm2, vm4, vm5);
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ createCacheInVMs(nyPort, vm3, vm6, vm7);
+ vm3.invoke(() -> WANTestBase.createReceiver());
+
+ //create senders on site1
+ vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ //create senders on site2
+ vm3.invoke(() -> WANTestBase.createSender( "ny", 1,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ny", 1,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ny", 1,
+ true, 100, 10, false, false, null, true ));
+
+ //create PR on site1
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
+
+ //create PR on site2
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
+
+ //start sender on site1
+ startSenderInVMs("ln", vm2, vm4, vm5);
+
+
+ //start sender on site2
+ startSenderInVMs("ny", vm3, vm6, vm7);
+
+
+ //pause senders on site1
+ vm2.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ //pause senders on site2
+ vm3.invoke(() -> WANTestBase.pauseSender( "ny" ));
+ vm6.invoke(() -> WANTestBase.pauseSender( "ny" ));
+ vm7.invoke(() -> WANTestBase.pauseSender( "ny" ));
+
+ //this is required since sender pause doesn't take effect immediately
+ Wait.pause(1000);
+
+ //Do 100 puts on site1
+ vm2.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 100 ));
+ //do next 100 puts on site2
+ vm3.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
+ 100, 200 ));
+ //verify queue size on both sites
+ vm2.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
+ vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
+ vm5.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
+
+ vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+ vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+ vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+
+ //resume sender on site1
+ vm2.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+ //validate events reached site2 from site1
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+
+ //on site2, verify queue size again
+ //this ensures that loopback is not happening since the queue size is same as before
+ //the event coming from site1 are not enqueued again
+ vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+ vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+ vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+
+ //resume sender on site2
+ vm3.invoke(() -> WANTestBase.resumeSender( "ny" ));
+ vm6.invoke(() -> WANTestBase.resumeSender( "ny" ));
+ vm7.invoke(() -> WANTestBase.resumeSender( "ny" ));
+
+ //validate region size on both the sites
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+ }
+
+ /**
+ * Test loop back issue among 3 WAN sites with Ring topology i.e. LN -> NY -> TK -> LN
+ * Site1 (LN): vm3, vm6
+ * Site2 (NY): vm4, vm7
+ * Site3 (TK): vm5
+ */
+ @Test
+ public void testParallelPropagationLoopBack3Sites() {
+ //Create locators
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+
+ //create cache and receivers on all the 3 sites
+ createCacheInVMs(lnPort, vm3, vm6);
+ createReceiverInVMs(vm3, vm6);
+ createCacheInVMs(nyPort, vm4, vm7);
+ createReceiverInVMs(vm4, vm7);
+ createCacheInVMs(tkPort, vm5);
+ createReceiverInVMs(vm5);
+
+
+ //create senders on all the 3 sites
+ vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ny", 3,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ny", 3,
+ true, 100, 10, false, false, null, true ));
+
+ vm5.invoke(() -> WANTestBase.createSender( "tk", 1,
+ true, 100, 10, false, false, null, true ));
+
+ //create PR on the 3 sites
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
+
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "tk", 0, 100, isOffHeap() ));
+
+ //start senders on all the sites
+ startSenderInVMs("ln", vm3, vm6);
+
+ startSenderInVMs("ny", vm4, vm7);
+
+ vm5.invoke(() -> WANTestBase.startSender( "tk" ));
+
+ //pause senders on site1 and site3. Site2 has the sender running to pass along events
+ vm3.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ vm5.invoke(() -> WANTestBase.pauseSender( "tk" ));
+
+ //need to have this pause since pauseSender doesn't take effect immediately
+ Wait.pause(1000);
+
+ //do puts on site1
+ vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 100 ));
+
+ //do more puts on site3
+ vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
+ 100, 200 ));
+
+ //verify queue size on site1 and site3
+ vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
+ vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 ));
+
+ //resume sender on site1 so that events reach site2 and from there to site3
+ vm3.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+ //validate region size on site2 (should have 100) and site3 (should have 200)
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 100 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+
+ //verify queue size remains same on site3 which means event loopback did not happen
+ //this means events coming from site1 are not enqueued back into the sender
+ vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 ));
+
+ //resume sender on site3
+ vm5.invoke(() -> WANTestBase.resumeSender( "tk" ));
+
+ //validate region size
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+ }
+
+ /**
+ * Test loop back issue among 3 WAN sites with N to N topology
+ * i.e. each site connected to all other sites.
+ * Puts are done to only one DS.
+ * LN site: vm3, vm6
+ * NY site: vm4, vm7
+ * TK site: vm5
+ */
+ @Test
+ public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+
+ createCacheInVMs(lnPort, vm3, vm6);
+ createCacheInVMs(nyPort, vm4, vm7);
+ createCacheInVMs(tkPort, vm5);
+ vm3.invoke(() -> WANTestBase.createReceiver());
+ vm4.invoke(() -> WANTestBase.createReceiver());
+ vm5.invoke(() -> WANTestBase.createReceiver());
+
+ //site1
+ vm3.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm3.invoke(() -> WANTestBase.createSender( "ln2", 3,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln2", 3,
+ true, 100, 10, false, false, null, true ));
+
+ //site2
+ vm4.invoke(() -> WANTestBase.createSender( "ny1", 1,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ny1", 1,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ny2", 3,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ny2", 3,
+ true, 100, 10, false, false, null, true ));
+
+ //site3
+ vm5.invoke(() -> WANTestBase.createSender( "tk1", 1,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "tk2", 2,
+ true, 100, 10, false, false, null, true ));
+
+ //create PR
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() ));
+
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "tk1,tk2", 0, 1, isOffHeap() ));
+
+ //start all the senders
+ vm3.invoke(() -> WANTestBase.startSender( "ln1" ));
+ vm3.invoke(() -> WANTestBase.startSender( "ln2" ));
+ vm6.invoke(() -> WANTestBase.startSender( "ln1" ));
+ vm6.invoke(() -> WANTestBase.startSender( "ln2" ));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ny1" ));
+ vm4.invoke(() -> WANTestBase.startSender( "ny2" ));
+ vm7.invoke(() -> WANTestBase.startSender( "ny1" ));
+ vm7.invoke(() -> WANTestBase.startSender( "ny2" ));
+
+ vm5.invoke(() -> WANTestBase.startSender( "tk1" ));
+ vm5.invoke(() -> WANTestBase.startSender( "tk2" ));
+
+ //pause senders on all the sites
+ vm3.invoke(() -> WANTestBase.pauseSender( "ln1" ));
+ vm3.invoke(() -> WANTestBase.pauseSender( "ln2" ));
+ vm6.invoke(() -> WANTestBase.pauseSender( "ln1" ));
+ vm6.invoke(() -> WANTestBase.pauseSender( "ln2" ));
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ny1" ));
+ vm4.invoke(() -> WANTestBase.pauseSender( "ny2" ));
+ vm7.invoke(() -> WANTestBase.pauseSender( "ny1" ));
+ vm7.invoke(() -> WANTestBase.pauseSender( "ny2" ));
+
+ vm5.invoke(() -> WANTestBase.pauseSender( "tk1" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "tk2" ));
+
+ //this is required since sender pause doesn't take effect immediately
+ Wait.pause(1000);
+
+ //do puts on site1
+ vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 100 ));
+
+ //verify queue size on site1 and site3
+ vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln1", 100 ));
+ vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln2", 100 ));
+
+ //resume sender (from site1 to site2) on site1
+ vm3.invoke(() -> WANTestBase.resumeSender( "ln1" ));
+ vm6.invoke(() -> WANTestBase.resumeSender( "ln1" ));
+
+ //validate region size on site2
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 100 ));
+
+ //verify queue size on site2 (sender 2 to 1)
+ //should remain at 0 as the events from site1 should not go back to site1
+ vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 ));
+
+ //verify queue size on site2 (sender 2 to 3)
+ //should remain at 0 as events from site1 will reach site3 directly..site2 need not send to site3 again
+ vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 ));
+
+ //do more puts on site3
+ vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
+ 100, 200 ));
+
+ //resume sender (from site3 to site2) on site3
+ vm5.invoke(() -> WANTestBase.resumeSender( "tk2" ));
+
+ //validate region size on site2
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+
+ //verify queue size on site2 (sender 2 to 3)
+ //should remain at 0 as the events from site3 should not go back to site3
+ vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 ));
+
+ //verify queue size on site2 (sender 2 to 1)
+ //should remain at 0 as events from site3 will reach site1 directly..site2 need not send to site1 again
+ vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 ));
+
+ //resume all senders
+ vm3.invoke(() -> WANTestBase.resumeSender( "ln2" ));
+ vm6.invoke(() -> WANTestBase.resumeSender( "ln2" ));
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ny1" ));
+ vm4.invoke(() -> WANTestBase.resumeSender( "ny2" ));
+ vm7.invoke(() -> WANTestBase.resumeSender( "ny1" ));
+ vm7.invoke(() -> WANTestBase.resumeSender( "ny2" ));
+
+ vm5.invoke(() -> WANTestBase.resumeSender( "tk1" ));
+
+ //validate region size on all sites
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 200 ));
+ }
+
+
+
+}