You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/08 22:58:31 UTC
[04/50] [abbrv] incubator-geode git commit: GEODE-873: Cleanup
ParallelWANStatsDUnitTest
GEODE-873: Cleanup ParallelWANStatsDUnitTest
Removing some duplicate code and tuning down the number of puts and
buckets in this test to make it run faster.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8af4255b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8af4255b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8af4255b
Branch: refs/heads/feature/GEODE-773-2
Commit: 8af4255b1d8dc0287306c095a8bf1fe987aa38e8
Parents: b11113f
Author: Dan Smith <up...@apache.org>
Authored: Tue Feb 2 13:54:52 2016 -0800
Committer: Dan Smith <up...@apache.org>
Committed: Wed Feb 3 15:50:28 2016 -0800
----------------------------------------------------------------------
.../wan/parallel/ParallelWANStatsDUnitTest.java | 478 ++++++++-----------
1 file changed, 186 insertions(+), 292 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8af4255b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 529e378..cb3c49c 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -21,13 +21,14 @@ import java.util.HashMap;
import java.util.Map;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.VM;
public class ParallelWANStatsDUnitTest extends WANTestBase{
+ private static final int NUM_PUTS = 100;
private static final long serialVersionUID = 1L;
-
+
public ParallelWANStatsDUnitTest(String name) {
super(name);
}
@@ -42,124 +43,60 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
"createFirstRemoteLocator", new Object[] { 2, lnPort });
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
- pause(3000);
+ createReceiver(vm2, nyPort);
+ createReceiver(vm3, nyPort);
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+ createSendersWithConflation(lnPort);
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
- vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
+ createSenderPRs(0);
- pause(2000);
-
- final Map keyValues = new HashMap();
- final Map updateKeyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
+ startPausedSenders();
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+ createReceiverPR(vm2, 1);
+ createReceiverPR(vm3, 1);
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+ putKeyValues();
ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
- WANTestBase.class, "getSenderStats", new Object[] { "ln", 1000 });
+ WANTestBase.class, "getSenderStats", new Object[] { "ln", NUM_PUTS });
ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(
- WANTestBase.class, "getSenderStats", new Object[] { "ln", 1000 });
+ WANTestBase.class, "getSenderStats", new Object[] { "ln", NUM_PUTS });
ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(
- WANTestBase.class, "getSenderStats", new Object[] { "ln", 1000 });
+ WANTestBase.class, "getSenderStats", new Object[] { "ln", NUM_PUTS });
ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(
- WANTestBase.class, "getSenderStats", new Object[] { "ln", 1000 });
+ WANTestBase.class, "getSenderStats", new Object[] { "ln", NUM_PUTS });
- assertEquals(1000, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(1000, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+ assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); //batches distributed
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
}
-
+
public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundacny() throws Exception {
Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
"createFirstLocatorWithDSId", new Object[] { 1 });
Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
"createFirstRemoteLocator", new Object[] { 2, lnPort });
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ createReceiver(vm2, nyPort);
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
+ createSenders(lnPort);
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 0, 100, isOffHeap() });
+ createReceiverPR(vm2, 0);
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 0, 100, isOffHeap() });
+ createSenderPRs(0);
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ startSenders();
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 1000 });
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName,
+ NUM_PUTS });
vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
+ testName, NUM_PUTS });
ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
@@ -172,13 +109,13 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(1000, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
- assertEquals(1000, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 100); //batches distributed
+ assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
- vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+ vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {10, NUM_PUTS, NUM_PUTS });
}
public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() throws Exception {
@@ -187,44 +124,21 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
"createFirstRemoteLocator", new Object[] { 2, lnPort });
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ createReceiver(vm2, nyPort);
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
+ createSenders(lnPort);
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 0, 100, isOffHeap() });
+ createReceiverPR(vm2, 0);
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 3, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 3, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 3, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 3, 100, isOffHeap() });
+ createSenderPRs(3);
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ startSenders();
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 1000 });
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName,
+ NUM_PUTS });
vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
+ testName, NUM_PUTS });
ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
@@ -236,13 +150,13 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(4000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(4000, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
- assertEquals(1000, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 100); //batches distributed
+ assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
- vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+ vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {10, NUM_PUTS, NUM_PUTS });
}
public void testWANStatsTwoWanSites_Bug44331() throws Exception {
@@ -252,8 +166,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer tkPort = (Integer)vm1.invoke(WANTestBase.class,
"createFirstRemoteLocator", new Object[] { 3, lnPort });
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
- vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
+ createReceiver(vm2, nyPort);
+ createReceiver(vm3, tkPort);
vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
@@ -263,26 +177,24 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln2",
3, true, 100, 10, false, false, null, true });
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 0, 100, isOffHeap() });
+ createReceiverPR(vm2, 0);
vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 0, 100, isOffHeap() });
+ testName, null, 0, 10, isOffHeap() });
vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln1,ln2", 0, 100, isOffHeap() });
+ testName, "ln1,ln2", 0, 10, isOffHeap() });
vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
- 1000 });
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName,
+ NUM_PUTS });
- pause(10000);
vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
+ testName, NUM_PUTS });
vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 1000 });
+ testName, NUM_PUTS });
ArrayList<Integer> v4Sender1List = (ArrayList<Integer>)vm4.invoke(
WANTestBase.class, "getSenderStats", new Object[] { "ln1", 0 });
@@ -290,21 +202,21 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
WANTestBase.class, "getSenderStats", new Object[] { "ln2", 0 });
assertEquals(0, v4Sender1List.get(0).intValue()); //queue size
- assertEquals(1000, v4Sender1List.get(1).intValue()); //eventsReceived
- assertEquals(1000, v4Sender1List.get(2).intValue()); //events queued
- assertEquals(1000, v4Sender1List.get(3).intValue()); //events distributed
- assertTrue(v4Sender1List.get(4).intValue()>=100); //batches distributed
+ assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); //eventsReceived
+ assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); //events queued
+ assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); //events distributed
+ assertTrue(v4Sender1List.get(4).intValue()>=10); //batches distributed
assertEquals(0, v4Sender1List.get(5).intValue()); //batches redistributed
assertEquals(0, v4Sender2List.get(0).intValue()); //queue size
- assertEquals(1000, v4Sender2List.get(1).intValue()); //eventsReceived
- assertEquals(1000, v4Sender2List.get(2).intValue()); //events queued
- assertEquals(1000, v4Sender2List.get(3).intValue()); //events distributed
- assertTrue(v4Sender2List.get(4).intValue()>=100); //batches distributed
+ assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); //eventsReceived
+ assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); //events queued
+ assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); //events distributed
+ assertTrue(v4Sender2List.get(4).intValue()>=10); //batches distributed
assertEquals(0, v4Sender2List.get(5).intValue()); //batches redistributed
- vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
- vm3.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+ vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {10, NUM_PUTS, NUM_PUTS });
+ vm3.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {10, NUM_PUTS, NUM_PUTS });
}
public void testParallelPropagationHA() throws Exception {
@@ -313,49 +225,26 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
"createFirstRemoteLocator", new Object[] { 2, lnPort });
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ createReceiver(vm2, nyPort);
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
+ createSenders(lnPort);
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 0, 100, isOffHeap() });
+ createReceiverPR(vm2, 0);
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 3, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 3, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 3, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 3, 100, isOffHeap() });
+ createSenderPRs(3);
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ startSenders();
AsyncInvocation inv1 = vm5.invokeAsync(WANTestBase.class, "doPuts",
- new Object[] { testName + "_PR", 10000 });
- pause(2000);
+ new Object[] { testName, 1000 });
+ pause(200);
AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "killSender");
inv1.join();
inv2.join();
vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 10000 });
+ testName, 1000 });
ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(
WANTestBase.class, "getSenderStats", new Object[] { "ln", 0});
@@ -367,16 +256,16 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
//We may see a single retried event on all members due to the kill
- assertTrue("Received " + receivedEvents, 30000 <= receivedEvents && 30003 >= receivedEvents); //eventsReceived
+ assertTrue("Received " + receivedEvents, 3000 <= receivedEvents && 3003 >= receivedEvents); //eventsReceived
int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
- assertTrue("Queued " + queuedEvents, 30000 <= queuedEvents && 30003 >= queuedEvents); //eventsQueued
+ assertTrue("Queued " + queuedEvents, 3000 <= queuedEvents && 3003 >= queuedEvents); //eventsQueued
//assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : its quite possible that vm4 has distributed some of the events
//assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its quite possible that vm4 has distributed some of the batches.
assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
- vm2.invoke(WANTestBase.class, "checkGatewayReceiverStatsHA", new Object[] {1000, 10000, 10000 });
+ vm2.invoke(WANTestBase.class, "checkGatewayReceiverStatsHA", new Object[] {NUM_PUTS, 1000, 1000 });
}
-
+
/**
* 1 region and sender configured on local site and 1 region and a
* receiver configured on remote site. Puts to the local region are in progress.
@@ -391,51 +280,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
"createFirstRemoteLocator", new Object[] { 2, lnPort });
- //these are part of remote site
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ createReceiver(vm2, nyPort);
- //these are part of local site
- vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ createSenders(lnPort);
- //senders are created on local site
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, false, false, null, true });
+ createReceiverPR(vm2, 0);
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", null, 0, 100, isOffHeap() });
-
- vm2.invoke(WANTestBase.class, "addCacheListenerAndDestroyRegion", new Object[] {
- testName + "_PR"});
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName + "_PR", "ln", 0, 100, isOffHeap() });
-
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm2.invoke(WANTestBase.class, "addCacheListenerAndDestroyRegion", new Object[] {
+ testName});
+
+ createSenderPRs(0);
+
+ startSenders();
//start puts in RR_1 in another thread
- vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 20000 });
+ vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 2000 });
//verify that all is well in local site. All the events should be present in local region
vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
- testName + "_PR", 20000 });
+ testName, 2000 });
ArrayList<Integer> v4List = (ArrayList<Integer>)vm5.invoke(
WANTestBase.class, "getSenderStats", new Object[] { "ln", -1});
@@ -450,7 +313,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); //batches distributed : its quite possible that vm4 has distributed some of the batches.
assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); //batches redistributed
}
-
+
public void testParallelPropogationWithFilter() throws Exception {
Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId",
@@ -458,7 +321,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
"createFirstRemoteLocator", new Object[] {2,lnPort });
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ createReceiver(vm2, nyPort);
vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
@@ -478,22 +341,11 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
true, 100, 10, false, false,
new MyGatewayEventFilter(), true });
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
+ createSenderPRs(0);
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ startSenders();
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
+ createReceiverPR(vm2, 1);
vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
@@ -526,58 +378,19 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
"createFirstRemoteLocator", new Object[] { 2, lnPort });
- vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
- vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
- vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
- vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
- vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
- true, 100, 10, true, false, null, true });
-
- vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
- vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, "ln", 0, 100, isOffHeap() });
+ createReceiver(vm2, nyPort);
- vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ createSendersWithConflation(lnPort);
- pause(3000);
+ createSenderPRs(0);
- vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
- vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+ startPausedSenders();
- vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
- testName, null, 1, 100, isOffHeap() });
+ createReceiverPR(vm2, 1);
- pause(2000);
-
- final Map keyValues = new HashMap();
+ Map keyValues = putKeyValues();
final Map updateKeyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
-
- vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
-
- vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
- for(int i=0;i<500;i++) {
+ for(int i=0;i<50;i++) {
updateKeyValues.put(i, i+"_updated");
}
@@ -607,7 +420,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
testName, keyValues });
- vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {0, 1500, 1000});
+ vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {0, 150, NUM_PUTS});
vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", 0 });
@@ -621,12 +434,93 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
- assertEquals(2000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
- assertEquals(2000, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
- assertEquals(1500, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
- assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 100); //batches distributed
+ assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+ assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+ assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+ assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
- assertEquals(500, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated
+ assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated
}
+
+ protected Map putKeyValues() {
+ final Map keyValues = new HashMap();
+ for(int i=0; i< NUM_PUTS; i++) {
+ keyValues.put(i, i);
+ }
+
+ vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+
+ vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+
+ return keyValues;
+ }
+
+ protected void createReceiverPR(VM vm, int redundancy) {
+ vm.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName, null, redundancy, 10, isOffHeap() });
+ }
+
+ protected void createSenderPRs(int redundancy) {
+ vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName, "ln", redundancy, 10, isOffHeap() });
+ vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName, "ln", redundancy, 10, isOffHeap() });
+ vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName, "ln", redundancy, 10, isOffHeap() });
+ vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+ testName, "ln", redundancy, 10, isOffHeap() });
+ }
+
+ protected void startPausedSenders() {
+ startSenders();
+
+ vm4.invoke(() ->pauseSenderAndWaitForDispatcherToPause( "ln" ));
+ vm5.invoke(() ->pauseSenderAndWaitForDispatcherToPause( "ln" ));
+ vm6.invoke(() ->pauseSenderAndWaitForDispatcherToPause( "ln" ));
+ vm7.invoke(() ->pauseSenderAndWaitForDispatcherToPause( "ln" ));
+ }
+
+ protected void createReceiver(VM vm, Integer nyPort) {
+ vm.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+ }
+
+ protected void startSenders() {
+ vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ }
+
+ protected void createSendersWithConflation(Integer lnPort) {
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+ vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+ true, 100, 10, true, false, null, true });
+ vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+ true, 100, 10, true, false, null, true });
+ vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+ true, 100, 10, true, false, null, true });
+ vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+ true, 100, 10, true, false, null, true });
+ }
+
+ protected void createSenders(Integer lnPort) {
+ vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+ vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+ vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+ vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+ vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true });
+ vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true });
+ vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true });
+ vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+ true, 100, 10, false, false, null, true });
+ }
}