You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:17 UTC
[29/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
deleted file mode 100644
index 427054a..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.Wait;
-
-@Category(DistributedTest.class)
-public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
-
- private static final long serialVersionUID = 1L;
-
- public ParallelWANPropagationConcurrentOpsDUnitTest() {
- super();
- }
-
- /**
- * Normal propagation scenario test case for a PR with only one bucket.
- * This has been added for bug# 44284.
- * @throws Exception
- */
- @Test
- public void testParallelPropagationWithSingleBucketPR() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 1, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 1, isOffHeap() ));
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- //pause the senders
- vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- Wait.pause(5000);
-
- AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 ));
- AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
- AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 ));
- AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-
- async1.join();
- async2.join();
- async3.join();
- async4.join();
-
- int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
- assertEquals("Actual queue size is not matching with the expected", 3500, queueSize);
-
- //resume the senders now
- vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- /**
- * Normal propagation scenario test case for a PR with less number of buckets.
- * Buckets have been kept to 10 for this test.
- * This has been added for bug# 44287.
- * @throws Exception
- */
- @Test
- public void testParallelPropagationWithLowNumberOfBuckets() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 10, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 10, isOffHeap() ));
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 ));
- AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
- AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 ));
- AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-
- async1.join();
- async2.join();
- async3.join();
- async4.join();
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testParallelQueueDrainInOrder_PR() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 3, 4, isOffHeap() ));
-
- vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.addQueueListener( "ln", true));
- vm5.invoke(() -> WANTestBase.addQueueListener( "ln", true));
- vm6.invoke(() -> WANTestBase.addQueueListener( "ln", true));
- vm7.invoke(() -> WANTestBase.addQueueListener( "ln", true));
-
- Wait.pause(2000);
- vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
- vm5.invoke(() -> WANTestBase.pauseSender( "ln"));
- vm6.invoke(() -> WANTestBase.pauseSender( "ln"));
- vm7.invoke(() -> WANTestBase.pauseSender( "ln"));
-
- Wait.pause(2000);
-
- vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 4 ));
- vm4.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
- vm5.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
- vm6.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4 ));
- vm7.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
-
- vm4.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
- vm5.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
- vm6.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4 ));
- vm7.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
-
- vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- vm6.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
-
- HashMap vm4BRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
- HashMap vm5BRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
- HashMap vm6BRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
- HashMap vm7BRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
-
- List b0SenderUpdates = (List)vm4BRUpdates.get("Create0");
- List b1SenderUpdates = (List)vm4BRUpdates.get("Create1");
- List b2SenderUpdates = (List)vm4BRUpdates.get("Create2");
- List b3SenderUpdates = (List)vm4BRUpdates.get("Create3");
-
- HashMap vm4QueueBRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
- HashMap vm5QueueBRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
- HashMap vm6QueueBRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
- HashMap vm7QueueBRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
-
- assertEquals(vm4QueueBRUpdates, vm5QueueBRUpdates);
- assertEquals(vm4QueueBRUpdates, vm6QueueBRUpdates);
- assertEquals(vm4QueueBRUpdates, vm7QueueBRUpdates);
-
- vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
- vm5.invoke(() -> WANTestBase.resumeSender( "ln"));
- vm6.invoke(() -> WANTestBase.resumeSender( "ln"));
- vm7.invoke(() -> WANTestBase.resumeSender( "ln"));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkPR(
- getTestMethodName() + "_PR"));
- List<Long> createList = (List)receiverUpdates.get("Create");
- ArrayList<Long> b0ReceiverUpdates = new ArrayList<Long>();
- ArrayList<Long> b1ReceiverUpdates = new ArrayList<Long>();
- ArrayList<Long> b2ReceiverUpdates = new ArrayList<Long>();
- ArrayList<Long> b3ReceiverUpdates = new ArrayList<Long>();
- for (Long key : createList) {
- long mod = key % 4;
- if (mod == 0) {
- b0ReceiverUpdates.add(key);
- }
- else if (mod == 1) {
- b1ReceiverUpdates.add(key);
- }
- else if (mod == 2) {
- b2ReceiverUpdates.add(key);
- }
- else if (mod == 3) {
- b3ReceiverUpdates.add(key);
- }
- }
- b0ReceiverUpdates.remove(0);
- b1ReceiverUpdates.remove(0);
- b2ReceiverUpdates.remove(0);
- b3ReceiverUpdates.remove(0);
-
- assertEquals(b0SenderUpdates, b0ReceiverUpdates);
- assertEquals(b1SenderUpdates, b1ReceiverUpdates);
- assertEquals(b2SenderUpdates, b2ReceiverUpdates);
- assertEquals(b3SenderUpdates, b3ReceiverUpdates);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
deleted file mode 100644
index 7b0cf3e..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
+++ /dev/null
@@ -1,1234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import java.util.Set;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.EntryExistsException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.client.ServerOperationException;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
-import com.gemstone.gemfire.internal.cache.RegionQueue;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.BatchException70;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.junit.categories.FlakyTest;
-
-@Category(DistributedTest.class)
-public class ParallelWANPropagationDUnitTest extends WANTestBase {
- private static final long serialVersionUID = 1L;
-
- public ParallelWANPropagationDUnitTest() {
- super();
- }
-
- @Test
- public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCache(lnPort);
- createSender("ln", 2, true, 100, 300, false, false,
- null, true);
- createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap());
-
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals("ln")) {
- sender = s;
- break;
- }
- }
- try {
- sender.start();
- } catch (Exception e) {
- e.printStackTrace();
- fail("Failed with IOException");
- }
-
- GemFireCacheImpl gemCache = (GemFireCacheImpl)cache;
- Set regionSet = gemCache.rootRegions();
-
- for (Object r : regionSet) {
- if (((Region)r).getName().equals(
- ((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0].getRegion().getName())) {
- fail("The shadowPR is exposed to the user");
- }
- }
- }
-
- @Test
- public void testParallelPropagation_withoutRemoteSite() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- //keep a larger batch to minimize number of exception occurrences in the log
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 300, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 300, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 300, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 300, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- //make sure all the senders are running before doing any puts
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
- createReceiverInVMs(vm2, vm3);
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- // Just making sure that though the remote site is started later,
- // remote site is still able to get the data. Since the receivers are
- // started before creating partition region it is quite possible that the
- // region may loose some of the events. This needs to be handled by the code
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- /**
- * Normal happy scenario test case.
- * @throws Exception
- */
- @Test
- public void testParallelPropagation() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- protected SerializableRunnableIF createReceiverPartitionedRegionRedundancy1() {
- return () -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() );
- }
-
- protected SerializableRunnableIF createPartitionedRegionRedundancy1Runnable() {
- return () -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() );
- }
-
- protected SerializableRunnableIF waitForSenderRunnable() {
- return () -> WANTestBase.waitForSenderRunningState( "ln" );
- }
-
- @Test
- public void testParallelPropagation_ManualStart() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, false ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, false ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, false ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, false ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- /**
- * Normal happy scenario test case2.
- * @throws Exception
- */
- @Test
- public void testParallelPropagationPutBeforeSenderStart() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- /**
- * Local and remote sites are up and running.
- * Local site cache is closed and the site is built again.
- * Puts are done to local site.
- * Expected: Remote site should receive all the events put after the local
- * site was built back.
- *
- * @throws Exception
- */
- @Test
- public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- //-------------------Close and rebuild local site ---------------------------------
-
- vm4.invoke(() -> WANTestBase.killSender());
- vm5.invoke(() -> WANTestBase.killSender());
- vm6.invoke(() -> WANTestBase.killSender());
- vm7.invoke(() -> WANTestBase.killSender());
-
- Integer regionSize =
- (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" ));
- LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
- //------------------------------------------------------------------------------------
-
- IgnoredException.addIgnoredException(EntryExistsException.class.getName());
- IgnoredException.addIgnoredException(BatchException70.class.getName());
- IgnoredException.addIgnoredException(ServerOperationException.class.getName());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testParallelColocatedPropagation() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
- }
- /**
- * Create colocated partitioned regions.
- * Parent region has PGS attached and child region doesn't.
- *
- * Validate that events for parent region reaches remote site.
- *
- * @throws Exception
- */
-
- @Test
- public void testParallelColocatedPropagation2() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child1", 1000 ));
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child2", 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName()+"_child1", 0 ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName()+"_child2", 0 ));
- }
-
-
- @Test
- public void testParallelPropagationWithOverflow() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- //let all the senders start before doing any puts to ensure that none of the events is lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), 150 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 150 ));
- }
-
- @Test
- public void testSerialReplicatedAndParallelPartitionedPropagation()
- throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "lnSerial",
- 2, false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnSerial",
- 2, false, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createSender( "lnParallel",
- 2, true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnParallel",
- 2, true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "lnParallel",
- 2, true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "lnParallel",
- 2, true, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap() ));
-
- startSenderInVMs("lnSerial", vm4, vm5);
-
- startSenderInVMs("lnParallel", vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testPartitionedParallelPropagationToTwoWanSites()
- throws Exception {
- Integer lnPort = createFirstLocatorWithDSId(1);
- Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- createReceiverInVMs(vm2);
-
- createCacheInVMs(tkPort, vm3);
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
- createReceiverInVMs(vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "lnParallel1",
- 2, true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnParallel1",
- 2, true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "lnParallel1",
- 2, true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "lnParallel1",
- 2, true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createSender( "lnParallel2",
- 3, true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnParallel2",
- 3, true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "lnParallel2",
- 3, true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "lnParallel2",
- 3, true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap() ));
-
- startSenderInVMs("lnParallel1", vm4, vm5, vm6, vm7);
-
- startSenderInVMs("lnParallel2", vm4, vm5, vm6, vm7);
-
-
- //before doing puts, make sure that the senders are started.
- //this will ensure that not a single events is lost
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
-
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
-
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Category(FlakyTest.class) // GEODE-1008 and GEODE-1180: random ports, async actions, thread sleeps, time sensitive, waitForCriterion
- @Test
- public void testPartitionedParallelPropagationHA() throws Exception {
- IgnoredException.addIgnoredException("Broken pipe");
- IgnoredException.addIgnoredException("Connection reset");
- IgnoredException.addIgnoredException("Unexpected IOException");
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 ));
- Wait.pause(500);
- AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
- AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 ));
- Wait.pause(1500);
- AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
- inv1.join();
- inv2.join();
- inv3.join();
- inv4.join();
-
- vm6.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 10000 ));
- vm7.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 10000 ));
-
- //verify all buckets drained on the sender nodes that up and running.
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 10000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 10000 ));
- }
-
- @Test
- public void testParallelPropagationWithFilter() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
-
- //wait for senders to be running before doing any puts. This will ensure that
- //not a single events is lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 800 ));
- }
-
-
- @Test
- public void testParallelPropagationWithPutAll() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doPutAll( getTestMethodName() + "_PR",
- 100 , 50 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 5000 ));
-
- }
-
- /**
- * There was a bug that all destroy events were being put into different buckets of sender queue
- * against the key 0. Bug# 44304
- *
- * @throws Exception
- */
- @Test
- public void testParallelPropagationWithDestroy() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 100, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 100, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 100, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 100, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
- vm6.invoke(createPartitionedRegionRedundancy1Runnable());
- vm7.invoke(createPartitionedRegionRedundancy1Runnable());
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
- vm6.invoke(waitForSenderRunnable());
- vm7.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- Wait.pause(2000);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
- vm4.invoke(() -> WANTestBase.doDestroys( getTestMethodName() + "_PR", 500 ));
-
-
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
-
- vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
-
- //give some time for the queue to drain
- Wait.pause(5000);
-
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 500 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 500 ));
-
- }
-
- /**
- * Normal happy scenario test case. But with Tx operations
- * @throws Exception
- */
- @Test
- public void testParallelPropagationTxOperations() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5);
- //vm6.invoke(() -> WANTestBase.createCache( lnPort ));
- //vm7.invoke(() -> WANTestBase.createCache( lnPort ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- //vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- // true, 100, 10, false, false, null, true ));
- //vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- // true, 100, 10, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-// vm6.invoke(() -> WANTestBase.createPartitionedRegion(
-// testName + "_PR", "ln", true, 1, 100, isOffHeap() ));
-// vm7.invoke(() -> WANTestBase.createPartitionedRegion(
-// testName + "_PR", "ln", true, 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-// vm6.invoke(() -> WANTestBase.startSender( "ln" ));
-// vm7.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- vm3.invoke(createReceiverPartitionedRegionRedundancy1());
-
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
-// vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-// vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm4.invoke(() -> WANTestBase.doTxPuts( getTestMethodName() + "_PR"));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-// vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-// vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 3 ));
- }
-
- @Ignore
- @Test
- public void testParallelGatewaySenderQueueLocalSize() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(vm2);
- createCacheInVMs(lnPort, vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- /*
- * Remember pausing sender does not guarantee that peek will be paused
- * immediately as its quite possible event processor is already in peeking
- * events and send them after peeking without a check for pause. hence below
- * pause of 1 sec to allow dispatching to be paused
- */
-// vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
-// vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
- Wait.pause(1000);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 10 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
-
- // instead of checking size as 5 and 5. check that combined size is 10
- Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
- Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
- assertEquals(10, localSize1 + localSize2);
- }
-
-
-
- public void tParallelGatewaySenderQueueLocalSizeWithHA() {
- IgnoredException.addIgnoredException("Broken pipe");
- IgnoredException.addIgnoredException("Connection reset");
- IgnoredException.addIgnoredException("Unexpected IOException");
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(vm2);
- createCacheInVMs(lnPort, vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(createPartitionedRegionRedundancy1Runnable());
- vm5.invoke(createPartitionedRegionRedundancy1Runnable());
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- /*
- * Remember pausing sender does not guarantee that peek will be paused
- * immediately as its quite possible event processor is already in peeking
- * events and send them after peeking without a check for pause. hence below
- * pause of 1 sec to allow dispatching to be paused
- */
-// vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
-// vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
- Wait.pause(1000);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 10 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
-
- Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
- Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
- assertEquals(10, localSize1 + localSize2);
-
- vm5.invoke(() -> WANTestBase.killSender( ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
- vm4.invoke(() -> WANTestBase.checkPRQLocalSize( "ln", 10 ));
-
- }
-
- /**
- * Added for defect #50364 Can't colocate region that has AEQ with a region that does not have that same AEQ
- */
- @Test
- public void testParallelSenderAttachedToChildRegionButNotToParentRegion() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- //create cache and receiver on site2
- createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(vm2);
- //create cache on site1
- createCacheInVMs(lnPort, vm3);
-
- //create sender on site1
- vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- //start sender on site1
- startSenderInVMs("ln", vm3);
-
- //create leader (parent) PR on site1
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() ));
- String parentRegionFullPath =
- (String) vm3.invoke(() -> WANTestBase.getRegionFullPath( getTestMethodName() + "PARENT_PR"));
-
- //create colocated (child) PR on site1
- vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegion(
- getTestMethodName() + "CHILD_PR", "ln", 0, 100, parentRegionFullPath ));
-
- //create leader and colocated PR on site2
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegion(
- getTestMethodName() + "CHILD_PR", null, 0, 100, parentRegionFullPath ));
-
- //do puts in colocated (child) PR on site1
- vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "CHILD_PR", 1000 ));
-
- //verify the puts reach site2
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "CHILD_PR", 1000 ));
- }
-
- @Test
- public void testParallelPropagationWithFilter_AfterAck() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm6, vm7);
- createReceiverInVMs(vm6, vm7);
-
- createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
-
- vm2.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
- vm3.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm2, vm3, vm4, vm5);
-
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
-
- // wait for senders to be running before doing any puts. This will ensure
- // that
- // not a single events is lost
- vm2.invoke(waitForSenderRunnable());
- vm3.invoke(waitForSenderRunnable());
- vm4.invoke(waitForSenderRunnable());
- vm5.invoke(waitForSenderRunnable());
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
- vm3.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
-
- vm6.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
- vm7.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
-
- Integer vm2Acks = (Integer)vm2.invoke(() -> WANTestBase.validateAfterAck( "ln"));
- Integer vm3Acks = (Integer)vm3.invoke(() -> WANTestBase.validateAfterAck( "ln"));
- Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( "ln"));
- Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( "ln"));
-
- assertEquals(2000, (vm2Acks + vm3Acks + vm4Acks + vm5Acks));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
deleted file mode 100644
index 07a6223..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.parallel;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.Wait;
-
-@Category(DistributedTest.class)
-public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
-
- private static final long serialVersionUID = 1L;
-
- public ParallelWANPropagationLoopBackDUnitTest() {
- super();
- }
-
- /**
- * Test loop back issue between 2 WAN sites (LN & NY). LN -> NY -> LN.
- * Site1 (LN): vm2, vm4, vm5
- * Site2 (NY): vm3, vm6, vm7
- */
- @Test
- public void testParallelPropagationLoopBack() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- //create receiver on site1 and site2
- createCacheInVMs(lnPort, vm2, vm4, vm5);
- vm2.invoke(() -> WANTestBase.createReceiver());
- createCacheInVMs(nyPort, vm3, vm6, vm7);
- vm3.invoke(() -> WANTestBase.createReceiver());
-
- //create senders on site1
- vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- //create senders on site2
- vm3.invoke(() -> WANTestBase.createSender( "ny", 1,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ny", 1,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ny", 1,
- true, 100, 10, false, false, null, true ));
-
- //create PR on site1
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
-
- //create PR on site2
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
-
- //start sender on site1
- startSenderInVMs("ln", vm2, vm4, vm5);
-
-
- //start sender on site2
- startSenderInVMs("ny", vm3, vm6, vm7);
-
-
- //pause senders on site1
- vm2.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- //pause senders on site2
- vm3.invoke(() -> WANTestBase.pauseSender( "ny" ));
- vm6.invoke(() -> WANTestBase.pauseSender( "ny" ));
- vm7.invoke(() -> WANTestBase.pauseSender( "ny" ));
-
- //this is required since sender pause doesn't take effect immediately
- Wait.pause(1000);
-
- //Do 100 puts on site1
- vm2.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 100 ));
- //do next 100 puts on site2
- vm3.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
- 100, 200 ));
- //verify queue size on both sites
- vm2.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
- vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
- vm5.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
-
- vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
- vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
- vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
-
- //resume sender on site1
- vm2.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
-
- //validate events reached site2 from site1
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
-
- //on site2, verify queue size again
- //this ensures that loopback is not happening since the queue size is same as before
- //the event coming from site1 are not enqueued again
- vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
- vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
- vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
-
- //resume sender on site2
- vm3.invoke(() -> WANTestBase.resumeSender( "ny" ));
- vm6.invoke(() -> WANTestBase.resumeSender( "ny" ));
- vm7.invoke(() -> WANTestBase.resumeSender( "ny" ));
-
- //validate region size on both the sites
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
- }
-
- /**
- * Test loop back issue among 3 WAN sites with Ring topology i.e. LN -> NY -> TK -> LN
- * Site1 (LN): vm3, vm6
- * Site2 (NY): vm4, vm7
- * Site3 (TK): vm5
- */
- @Test
- public void testParallelPropagationLoopBack3Sites() {
- //Create locators
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
-
- //create cache and receivers on all the 3 sites
- createCacheInVMs(lnPort, vm3, vm6);
- createReceiverInVMs(vm3, vm6);
- createCacheInVMs(nyPort, vm4, vm7);
- createReceiverInVMs(vm4, vm7);
- createCacheInVMs(tkPort, vm5);
- createReceiverInVMs(vm5);
-
-
- //create senders on all the 3 sites
- vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ny", 3,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ny", 3,
- true, 100, 10, false, false, null, true ));
-
- vm5.invoke(() -> WANTestBase.createSender( "tk", 1,
- true, 100, 10, false, false, null, true ));
-
- //create PR on the 3 sites
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
-
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "tk", 0, 100, isOffHeap() ));
-
- //start senders on all the sites
- startSenderInVMs("ln", vm3, vm6);
-
- startSenderInVMs("ny", vm4, vm7);
-
- vm5.invoke(() -> WANTestBase.startSender( "tk" ));
-
- //pause senders on site1 and site3. Site2 has the sender running to pass along events
- vm3.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- vm5.invoke(() -> WANTestBase.pauseSender( "tk" ));
-
- //need to have this pause since pauseSender doesn't take effect immediately
- Wait.pause(1000);
-
- //do puts on site1
- vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 100 ));
-
- //do more puts on site3
- vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
- 100, 200 ));
-
- //verify queue size on site1 and site3
- vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
- vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 ));
-
- //resume sender on site1 so that events reach site2 and from there to site3
- vm3.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
-
- //validate region size on site2 (should have 100) and site3 (should have 200)
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 100 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
-
- //verify queue size remains same on site3 which means event loopback did not happen
- //this means events coming from site1 are not enqueued back into the sender
- vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 ));
-
- //resume sender on site3
- vm5.invoke(() -> WANTestBase.resumeSender( "tk" ));
-
- //validate region size
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
- }
-
- /**
- * Test loop back issue among 3 WAN sites with N to N topology
- * i.e. each site connected to all other sites.
- * Puts are done to only one DS.
- * LN site: vm3, vm6
- * NY site: vm4, vm7
- * TK site: vm5
- */
- @Test
- public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
-
- createCacheInVMs(lnPort, vm3, vm6);
- createCacheInVMs(nyPort, vm4, vm7);
- createCacheInVMs(tkPort, vm5);
- vm3.invoke(() -> WANTestBase.createReceiver());
- vm4.invoke(() -> WANTestBase.createReceiver());
- vm5.invoke(() -> WANTestBase.createReceiver());
-
- //site1
- vm3.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 100, 10, false, false, null, true ));
-
- vm3.invoke(() -> WANTestBase.createSender( "ln2", 3,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln2", 3,
- true, 100, 10, false, false, null, true ));
-
- //site2
- vm4.invoke(() -> WANTestBase.createSender( "ny1", 1,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ny1", 1,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ny2", 3,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ny2", 3,
- true, 100, 10, false, false, null, true ));
-
- //site3
- vm5.invoke(() -> WANTestBase.createSender( "tk1", 1,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "tk2", 2,
- true, 100, 10, false, false, null, true ));
-
- //create PR
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() ));
-
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "tk1,tk2", 0, 1, isOffHeap() ));
-
- //start all the senders
- vm3.invoke(() -> WANTestBase.startSender( "ln1" ));
- vm3.invoke(() -> WANTestBase.startSender( "ln2" ));
- vm6.invoke(() -> WANTestBase.startSender( "ln1" ));
- vm6.invoke(() -> WANTestBase.startSender( "ln2" ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ny1" ));
- vm4.invoke(() -> WANTestBase.startSender( "ny2" ));
- vm7.invoke(() -> WANTestBase.startSender( "ny1" ));
- vm7.invoke(() -> WANTestBase.startSender( "ny2" ));
-
- vm5.invoke(() -> WANTestBase.startSender( "tk1" ));
- vm5.invoke(() -> WANTestBase.startSender( "tk2" ));
-
- //pause senders on all the sites
- vm3.invoke(() -> WANTestBase.pauseSender( "ln1" ));
- vm3.invoke(() -> WANTestBase.pauseSender( "ln2" ));
- vm6.invoke(() -> WANTestBase.pauseSender( "ln1" ));
- vm6.invoke(() -> WANTestBase.pauseSender( "ln2" ));
-
- vm4.invoke(() -> WANTestBase.pauseSender( "ny1" ));
- vm4.invoke(() -> WANTestBase.pauseSender( "ny2" ));
- vm7.invoke(() -> WANTestBase.pauseSender( "ny1" ));
- vm7.invoke(() -> WANTestBase.pauseSender( "ny2" ));
-
- vm5.invoke(() -> WANTestBase.pauseSender( "tk1" ));
- vm5.invoke(() -> WANTestBase.pauseSender( "tk2" ));
-
- //this is required since sender pause doesn't take effect immediately
- Wait.pause(1000);
-
- //do puts on site1
- vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 100 ));
-
- //verify queue size on site1 and site3
- vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln1", 100 ));
- vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln2", 100 ));
-
- //resume sender (from site1 to site2) on site1
- vm3.invoke(() -> WANTestBase.resumeSender( "ln1" ));
- vm6.invoke(() -> WANTestBase.resumeSender( "ln1" ));
-
- //validate region size on site2
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 100 ));
-
- //verify queue size on site2 (sender 2 to 1)
- //should remain at 0 as the events from site1 should not go back to site1
- vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 ));
-
- //verify queue size on site2 (sender 2 to 3)
- //should remain at 0 as events from site1 will reach site3 directly..site2 need not send to site3 again
- vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 ));
-
- //do more puts on site3
- vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
- 100, 200 ));
-
- //resume sender (from site3 to site2) on site3
- vm5.invoke(() -> WANTestBase.resumeSender( "tk2" ));
-
- //validate region size on site2
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
-
- //verify queue size on site2 (sender 2 to 3)
- //should remain at 0 as the events from site3 should not go back to site3
- vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 ));
-
- //verify queue size on site2 (sender 2 to 1)
- //should remain at 0 as events from site3 will reach site1 directly..site2 need not send to site1 again
- vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 ));
-
- //resume all senders
- vm3.invoke(() -> WANTestBase.resumeSender( "ln2" ));
- vm6.invoke(() -> WANTestBase.resumeSender( "ln2" ));
-
- vm4.invoke(() -> WANTestBase.resumeSender( "ny1" ));
- vm4.invoke(() -> WANTestBase.resumeSender( "ny2" ));
- vm7.invoke(() -> WANTestBase.resumeSender( "ny1" ));
- vm7.invoke(() -> WANTestBase.resumeSender( "ny2" ));
-
- vm5.invoke(() -> WANTestBase.resumeSender( "tk1" ));
-
- //validate region size on all sites
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 200 ));
- }
-
-
-
-}