You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/08 22:58:31 UTC

[04/50] [abbrv] incubator-geode git commit: GEODE-873: Cleanup ParallelWANStatsDUnitTest

GEODE-873: Cleanup ParallelWANStatsDUnitTest

Removing some duplicate code and tuning down the number of puts and
buckets in this test to make it run faster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8af4255b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8af4255b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8af4255b

Branch: refs/heads/feature/GEODE-773-2
Commit: 8af4255b1d8dc0287306c095a8bf1fe987aa38e8
Parents: b11113f
Author: Dan Smith <up...@apache.org>
Authored: Tue Feb 2 13:54:52 2016 -0800
Committer: Dan Smith <up...@apache.org>
Committed: Wed Feb 3 15:50:28 2016 -0800

----------------------------------------------------------------------
 .../wan/parallel/ParallelWANStatsDUnitTest.java | 478 ++++++++-----------
 1 file changed, 186 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8af4255b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 529e378..cb3c49c 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -21,13 +21,14 @@ import java.util.HashMap;
 import java.util.Map;
 
 import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
 import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.VM;
 
 public class ParallelWANStatsDUnitTest extends WANTestBase{
   
+  private static final int NUM_PUTS = 100;
   private static final long serialVersionUID = 1L;
-
+  
   public ParallelWANStatsDUnitTest(String name) {
     super(name);
   }
@@ -42,124 +43,60 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
         "createFirstRemoteLocator", new Object[] { 2, lnPort });
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-    pause(3000);
+    createReceiver(vm2, nyPort);
+    createReceiver(vm3, nyPort);
 
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    createSendersWithConflation(lnPort);
 
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
+    createSenderPRs(0);
 
-    pause(2000);
-    
-    final Map keyValues = new HashMap();
-    final Map updateKeyValues = new HashMap();
-    for(int i=0; i< 1000; i++) {
-      keyValues.put(i, i);
-    }
+    startPausedSenders();
     
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+    createReceiverPR(vm2, 1);
+    createReceiverPR(vm3, 1);
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+    putKeyValues();
 
     ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
-        WANTestBase.class, "getSenderStats", new Object[] { "ln", 1000 });
+        WANTestBase.class, "getSenderStats", new Object[] { "ln", NUM_PUTS });
     ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(
-        WANTestBase.class, "getSenderStats", new Object[] { "ln", 1000 });
+        WANTestBase.class, "getSenderStats", new Object[] { "ln", NUM_PUTS });
     ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(
-        WANTestBase.class, "getSenderStats", new Object[] { "ln", 1000 });
+        WANTestBase.class, "getSenderStats", new Object[] { "ln", NUM_PUTS });
     ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(
-        WANTestBase.class, "getSenderStats", new Object[] { "ln", 1000 });
+        WANTestBase.class, "getSenderStats", new Object[] { "ln", NUM_PUTS });
     
-    assertEquals(1000, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
-    assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
-    assertEquals(1000, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(NUM_PUTS, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
+    assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
     assertEquals(0, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
     assertEquals(0, v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4)); //batches distributed
     assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
     
   }
-  
+
   public void testPartitionedRegionParallelPropagation_AfterDispatch_NoRedundacny() throws Exception {
     Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
         "createFirstLocatorWithDSId", new Object[] { 1 });
     Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
         "createFirstRemoteLocator", new Object[] { 2, lnPort });
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    createReceiver(vm2, nyPort);
    
 
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, true });
+    createSenders(lnPort);
 
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", null, 0, 100, isOffHeap() });
+    createReceiverPR(vm2, 0);
    
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 0, 100, isOffHeap() });
+    createSenderPRs(0);
     
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    startSenders();
 
-    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-        1000 });
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName,
+        NUM_PUTS });
     
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_PR", 1000 });
+        testName, NUM_PUTS });
     
     
     ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
@@ -172,13 +109,13 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
         WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
     
     assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
-    assertEquals(1000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
-    assertEquals(1000, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
-    assertEquals(1000, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
-    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 100); //batches distributed
+    assertEquals(NUM_PUTS, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(NUM_PUTS, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
     assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
     
-    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {10, NUM_PUTS, NUM_PUTS });
   }
   
   public void testPartitionedRegionParallelPropagation_AfterDispatch_Redundancy_3() throws Exception {
@@ -187,44 +124,21 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
         "createFirstRemoteLocator", new Object[] { 2, lnPort });
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    createReceiver(vm2, nyPort);
 
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, true });
+    createSenders(lnPort);
 
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", null, 0, 100, isOffHeap() });
+    createReceiverPR(vm2, 0);
 
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 3, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 3, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 3, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 3, 100, isOffHeap() });
+    createSenderPRs(3);
     
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    startSenders();
 
-    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-        1000 });
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName,
+        NUM_PUTS });
     
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_PR", 1000 });
+        testName, NUM_PUTS });
     
     ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
         WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
@@ -236,13 +150,13 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
         WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
     
     assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
-    assertEquals(4000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
-    assertEquals(4000, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
-    assertEquals(1000, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
-    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 100); //batches distributed
+    assertEquals(400, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(400, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(NUM_PUTS, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
     assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
     
-    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {10, NUM_PUTS, NUM_PUTS });
   }
   
   public void testWANStatsTwoWanSites_Bug44331() throws Exception {
@@ -252,8 +166,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer tkPort = (Integer)vm1.invoke(WANTestBase.class,
         "createFirstRemoteLocator", new Object[] { 3, lnPort });
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
+    createReceiver(vm2, nyPort);
+    createReceiver(vm3, tkPort);
 
     vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
 
@@ -263,26 +177,24 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln2",
         3, true, 100, 10, false, false, null, true });
   
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", null, 0, 100, isOffHeap() });
+    createReceiverPR(vm2, 0);
     vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", null, 0, 100, isOffHeap() });
+      testName, null, 0, 10, isOffHeap() });
 
     vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln1,ln2", 0, 100, isOffHeap() });
+      testName, "ln1,ln2", 0, 10, isOffHeap() });
     
     vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
 
     vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
 
-    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
-        1000 });
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName,
+        NUM_PUTS });
 
-    pause(10000);
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_PR", 1000 });
+        testName, NUM_PUTS });
     vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_PR", 1000 });
+        testName, NUM_PUTS });
     
     ArrayList<Integer> v4Sender1List = (ArrayList<Integer>)vm4.invoke(
         WANTestBase.class, "getSenderStats", new Object[] { "ln1", 0 });
@@ -290,21 +202,21 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
         WANTestBase.class, "getSenderStats", new Object[] { "ln2", 0 });
     
     assertEquals(0, v4Sender1List.get(0).intValue()); //queue size
-    assertEquals(1000, v4Sender1List.get(1).intValue()); //eventsReceived
-    assertEquals(1000, v4Sender1List.get(2).intValue()); //events queued
-    assertEquals(1000, v4Sender1List.get(3).intValue()); //events distributed
-    assertTrue(v4Sender1List.get(4).intValue()>=100); //batches distributed
+    assertEquals(NUM_PUTS, v4Sender1List.get(1).intValue()); //eventsReceived
+    assertEquals(NUM_PUTS, v4Sender1List.get(2).intValue()); //events queued
+    assertEquals(NUM_PUTS, v4Sender1List.get(3).intValue()); //events distributed
+    assertTrue(v4Sender1List.get(4).intValue()>=10); //batches distributed
     assertEquals(0, v4Sender1List.get(5).intValue()); //batches redistributed
     
     assertEquals(0, v4Sender2List.get(0).intValue()); //queue size
-    assertEquals(1000, v4Sender2List.get(1).intValue()); //eventsReceived
-    assertEquals(1000, v4Sender2List.get(2).intValue()); //events queued
-    assertEquals(1000, v4Sender2List.get(3).intValue()); //events distributed
-    assertTrue(v4Sender2List.get(4).intValue()>=100); //batches distributed
+    assertEquals(NUM_PUTS, v4Sender2List.get(1).intValue()); //eventsReceived
+    assertEquals(NUM_PUTS, v4Sender2List.get(2).intValue()); //events queued
+    assertEquals(NUM_PUTS, v4Sender2List.get(3).intValue()); //events distributed
+    assertTrue(v4Sender2List.get(4).intValue()>=10); //batches distributed
     assertEquals(0, v4Sender2List.get(5).intValue()); //batches redistributed
     
-    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
-    vm3.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {100, 1000, 1000 });
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {10, NUM_PUTS, NUM_PUTS });
+    vm3.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {10, NUM_PUTS, NUM_PUTS });
   }
   
   public void testParallelPropagationHA() throws Exception {
@@ -313,49 +225,26 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
         "createFirstRemoteLocator", new Object[] { 2, lnPort });
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    createReceiver(vm2, nyPort);
     
 
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, true });
+    createSenders(lnPort);
     
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", null, 0, 100, isOffHeap() });
+    createReceiverPR(vm2, 0);
     
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 3, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 3, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 3, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-      testName + "_PR", "ln", 3, 100, isOffHeap() });
+    createSenderPRs(3);
     
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    startSenders();
 
     AsyncInvocation inv1 = vm5.invokeAsync(WANTestBase.class, "doPuts",
-        new Object[] { testName + "_PR", 10000 });
-    pause(2000);
+        new Object[] { testName, 1000 });
+    pause(200);
     AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "killSender");
     inv1.join();
     inv2.join();
     
     vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_PR", 10000 });
+        testName, 1000 });
     
     ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(
         WANTestBase.class, "getSenderStats", new Object[] { "ln", 0});
@@ -367,16 +256,16 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     assertEquals(0, v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
     int receivedEvents = v5List.get(1) + v6List.get(1) + v7List.get(1);
     //We may see a single retried event on all members due to the kill
-    assertTrue("Received " + receivedEvents, 30000 <= receivedEvents && 30003 >= receivedEvents); //eventsReceived
+    assertTrue("Received " + receivedEvents, 3000 <= receivedEvents && 3003 >= receivedEvents); //eventsReceived
     int queuedEvents = v5List.get(2) + v6List.get(2) + v7List.get(2);
-    assertTrue("Queued " + queuedEvents, 30000 <= queuedEvents && 30003 >= queuedEvents); //eventsQueued
+    assertTrue("Queued " + queuedEvents, 3000 <= queuedEvents && 3003 >= queuedEvents); //eventsQueued
     //assertTrue(10000 <= v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed : its quite possible that vm4 has distributed some of the events
     //assertTrue(v5List.get(4) + v6List.get(4) + v7List.get(4) > 1000); //batches distributed : its quite possible that vm4 has distributed some of the batches.
     assertEquals(0, v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
   
-    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStatsHA", new Object[] {1000, 10000, 10000 });
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStatsHA", new Object[] {NUM_PUTS, 1000, 1000 });
   }
-  
+
   /**
    * 1 region and sender configured on local site and 1 region and a 
    * receiver configured on remote site. Puts to the local region are in progress.
@@ -391,51 +280,25 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
         "createFirstRemoteLocator", new Object[] { 2, lnPort });
 
-    //these are part of remote site
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    createReceiver(vm2, nyPort);
 
-    //these are part of local site
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    createSenders(lnPort);
 
-    //senders are created on local site
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 10, false, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-    true, 100, 10, false, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-    true, 100, 10, false, false, null, true });
+    createReceiverPR(vm2, 0);
 
-  vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-    testName + "_PR", null, 0, 100, isOffHeap() });
-  
-  vm2.invoke(WANTestBase.class, "addCacheListenerAndDestroyRegion", new Object[] {
-    testName + "_PR"});
-  
-  vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-    testName + "_PR", "ln", 0, 100, isOffHeap() });
-  vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-    testName + "_PR", "ln", 0, 100, isOffHeap() });
-  vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-    testName + "_PR", "ln", 0, 100, isOffHeap() });
-  vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-    testName + "_PR", "ln", 0, 100, isOffHeap() });
-  
-  vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-  vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-  vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-  vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm2.invoke(WANTestBase.class, "addCacheListenerAndDestroyRegion", new Object[] {
+        testName});
+
+    createSenderPRs(0);
+
+    startSenders();
 
     //start puts in RR_1 in another thread
-    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 20000 });
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 2000 });
     
     //verify that all is well in local site. All the events should be present in local region
     vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName + "_PR", 20000 });
+        testName, 2000 });
     
     ArrayList<Integer> v4List = (ArrayList<Integer>)vm5.invoke(
         WANTestBase.class, "getSenderStats", new Object[] { "ln", -1});
@@ -450,7 +313,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 1); //batches distributed : its quite possible that vm4 has distributed some of the batches.
     assertTrue(v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5) >= 1); //batches redistributed
   }
-  
+
   public void testParallelPropogationWithFilter() throws Exception {
 
     Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId",
@@ -458,7 +321,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
         "createFirstRemoteLocator", new Object[] {2,lnPort });
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    createReceiver(vm2, nyPort);
 
     vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
     vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
@@ -478,22 +341,11 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
       true, 100, 10, false, false,
       new MyGatewayEventFilter(), true });
   
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
+    createSenderPRs(0);
 
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    startSenders();
 
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
+    createReceiverPR(vm2, 1);
     
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
 
@@ -526,58 +378,19 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
         "createFirstRemoteLocator", new Object[] { 2, lnPort });
 
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
+    createReceiver(vm2, nyPort);
 
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    createSendersWithConflation(lnPort);
 
-    pause(3000);
+    createSenderPRs(0);
 
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    startPausedSenders();
 
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
+    createReceiverPR(vm2, 1);
 
-    pause(2000);
-    
-    final Map keyValues = new HashMap();
+    Map keyValues = putKeyValues();
     final Map updateKeyValues = new HashMap();
-    for(int i=0; i< 1000; i++) {
-      keyValues.put(i, i);
-    }
-    
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
-
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
-    for(int i=0;i<500;i++) {
+    for(int i=0;i<50;i++) {
       updateKeyValues.put(i, i+"_updated");
     }
     
@@ -607,7 +420,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
     vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
         testName, keyValues });
     
-    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {0, 1500, 1000});
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {0, 150, NUM_PUTS});
     
     vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", 0 });
     
@@ -621,12 +434,93 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
         WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
     
     assertEquals(0, v4List.get(0) + v5List.get(0) + v6List.get(0) + v7List.get(0) ); //queue size
-    assertEquals(2000, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
-    assertEquals(2000, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
-    assertEquals(1500, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
-    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 100); //batches distributed
+    assertEquals(200, v4List.get(1) + v5List.get(1) + v6List.get(1) + v7List.get(1)); //eventsReceived
+    assertEquals(200, v4List.get(2) + v5List.get(2) + v6List.get(2) + v7List.get(2)); //events queued
+    assertEquals(150, v4List.get(3) + v5List.get(3) + v6List.get(3) + v7List.get(3)); //events distributed
+    assertTrue(v4List.get(4) + v5List.get(4) + v6List.get(4) + v7List.get(4) >= 10); //batches distributed
     assertEquals(0, v4List.get(5) + v5List.get(5) + v6List.get(5) + v7List.get(5)); //batches redistributed
-    assertEquals(500, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated 
+    assertEquals(50, v4List.get(7) + v5List.get(7) + v6List.get(7) + v7List.get(7)); //events conflated 
     
   }
+  
+  protected Map putKeyValues() {
+    final Map keyValues = new HashMap();
+    for(int i=0; i< NUM_PUTS; i++) {
+      keyValues.put(i, i);
+    }
+    
+    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+
+    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+    
+    return keyValues;
+  }
+
+  protected void createReceiverPR(VM vm, int redundancy) {
+    vm.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, redundancy, 10, isOffHeap() });
+  }
+  
+  protected void createSenderPRs(int redundancy) {
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", redundancy, 10, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", redundancy, 10, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", redundancy, 10, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", redundancy, 10, isOffHeap() });
+  }
+
+  protected void startPausedSenders() {
+    startSenders();
+    
+    vm4.invoke(() ->pauseSenderAndWaitForDispatcherToPause( "ln" ));
+    vm5.invoke(() ->pauseSenderAndWaitForDispatcherToPause( "ln" ));
+    vm6.invoke(() ->pauseSenderAndWaitForDispatcherToPause( "ln" ));
+    vm7.invoke(() ->pauseSenderAndWaitForDispatcherToPause( "ln" ));
+  }
+
+  protected void createReceiver(VM vm, Integer nyPort) {
+    vm.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+  }
+  
+  protected void startSenders() {
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+  }
+
+  protected void createSendersWithConflation(Integer lnPort) {
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, true, false, null, true });
+  }
+
+  protected void createSenders(Integer lnPort) {
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 10, false, false, null, true });
+  }
 }