You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/01 21:55:20 UTC

[03/50] [abbrv] incubator-geode git commit: GEODE-864: Waiting for conflation thread in conflation tests

GEODE-864: Waiting for conflation thread in conflation tests

For parallel queues, there is a actually a separate conflation thread
that removes entries from the queue on conflation. The tests need to
wait for this thread to finish it's work.

As part of this change, I refactored the ParallelWANConflationDUnitTest
to remove a bunch of duplicate code, replaced the VM.invoke(String)
calls with lambdas, and scaled the test down to create fewer buckets and
do fewer puts so that it runs in 1/4 of the time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a7249514
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a7249514
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a7249514

Branch: refs/heads/feature/GEODE-773-2
Commit: a7249514eb2f09e5ca4920e7dc7aedfce4c2fe12
Parents: 27d965d
Author: Dan Smith <up...@apache.org>
Authored: Tue Jan 26 11:26:21 2016 -0800
Committer: Dan Smith <up...@apache.org>
Committed: Tue Jan 26 16:50:11 2016 -0800

----------------------------------------------------------------------
 .../gemfire/internal/cache/wan/WANTestBase.java |  149 ++-
 ...arallelGatewaySenderOperationsDUnitTest.java |    8 +-
 .../ParallelWANConflationDUnitTest.java         | 1017 ++++++------------
 .../wan/serial/SerialWANStatsDUnitTest.java     |    2 +-
 4 files changed, 424 insertions(+), 752 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7249514/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index 230c8d8..5539eb1 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.gemstone.gemfire.cache.AttributesFactory;
@@ -117,6 +118,7 @@ import com.gemstone.gemfire.test.dunit.DistributedTestCase;
 import com.gemstone.gemfire.test.dunit.Host;
 import com.gemstone.gemfire.test.dunit.VM;
 import com.gemstone.gemfire.util.test.TestUtil;
+import com.jayway.awaitility.Awaitility;
 
 import junit.framework.Assert;
 
@@ -2160,6 +2162,8 @@ public class WANTestBase extends DistributedTestCase{
         }
       }
       sender.pause();
+      ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause();
+      
     }
     finally {
       exp.remove();
@@ -2167,27 +2171,6 @@ public class WANTestBase extends DistributedTestCase{
     }
   }
   
-  public static void pauseSenderAndWaitForDispatcherToPause(String senderId) {
-    final ExpectedException exln = addExpectedException("Could not connect");
-    ExpectedException exp = addExpectedException(ForceReattemptException.class
-        .getName());
-    try {
-      Set<GatewaySender> senders = cache.getGatewaySenders();
-      GatewaySender sender = null;
-      for (GatewaySender s : senders) {
-        if (s.getId().equals(senderId)) {
-          sender = s;
-          break;
-        }
-      }
-      sender.pause();
-      ((AbstractGatewaySender)sender).getEventProcessor().waitForDispatcherToPause();
-    } finally {
-      exp.remove();
-      exln.remove();
-    }    
-  }
-
   public static void resumeSender(String senderId) {
     final ExpectedException exln = addExpectedException("Could not connect");
     ExpectedException exp = addExpectedException(ForceReattemptException.class
@@ -3189,35 +3172,23 @@ public class WANTestBase extends DistributedTestCase{
   
   
   public static Map putCustomerPartitionedRegion(int numPuts) {
-    assertNotNull(cache);
-    assertNotNull(customerRegion);
-    Map custKeyValues = new HashMap();
-    for (int i = 1; i <= numPuts; i++) {
-      CustId custid = new CustId(i);
-      Customer customer = new Customer("name" + i, "Address" + i);
-      try {
-        customerRegion.put(custid, customer);
-        custKeyValues.put(custid, customer);
-        assertTrue(customerRegion.containsKey(custid));
-        assertEquals(customer,customerRegion.get(custid));
-      }
-      catch (Exception e) {
-        fail(
-            "putCustomerPartitionedRegion : failed while doing put operation in CustomerPartitionedRegion ",
-            e);
-      }
-      getLogWriter().info("Customer :- { " + custid + " : " + customer + " }");
-    }
-    return custKeyValues;
+    String valueSuffix = "";
+    return putCustomerPartitionedRegion(numPuts, valueSuffix);
   }
   
   public static Map updateCustomerPartitionedRegion(int numPuts) {
+    String valueSuffix = "_update";
+    return putCustomerPartitionedRegion(numPuts, valueSuffix);
+  }
+
+  protected static Map putCustomerPartitionedRegion(int numPuts,
+      String valueSuffix) {
     assertNotNull(cache);
     assertNotNull(customerRegion);
-    Map custKeyValues = new HashMap();    
+    Map custKeyValues = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
-      Customer customer = new Customer("name" + i, "Address" + i + "_update");
+      Customer customer = new Customer("name" + i, "Address" + i + valueSuffix);
       try {
         customerRegion.put(custid, customer);
         assertTrue(customerRegion.containsKey(custid));
@@ -3240,24 +3211,22 @@ public class WANTestBase extends DistributedTestCase{
     Map orderKeyValues = new HashMap();
     for (int i = 1; i <= numPuts; i++) {
       CustId custid = new CustId(i);
-      for (int j = 1; j <= 1; j++) {
-        int oid = (i * 1) + j;
-        OrderId orderId = new OrderId(oid, custid);
-        Order order = new Order("OREDR" + oid);
-        try {
-          orderRegion.put(orderId, order);
-          orderKeyValues.put(orderId, order);
-          assertTrue(orderRegion.containsKey(orderId));
-          assertEquals(order,orderRegion.get(orderId));
+      int oid = i + 1;
+      OrderId orderId = new OrderId(oid, custid);
+      Order order = new Order("OREDR" + oid);
+      try {
+        orderRegion.put(orderId, order);
+        orderKeyValues.put(orderId, order);
+        assertTrue(orderRegion.containsKey(orderId));
+        assertEquals(order,orderRegion.get(orderId));
 
-        }
-        catch (Exception e) {
-          fail(
-              "putOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
-              e);
-        }
-        getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
       }
+      catch (Exception e) {
+        fail(
+            "putOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
+            e);
+      }
+      getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
     }
     return orderKeyValues;
   }
@@ -3412,36 +3381,7 @@ public class WANTestBase extends DistributedTestCase{
     }
     return shipmentKeyValue;
   }
-
-  public static void doPutsPDXSerializable(String regionName, int numPuts) {
-    Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    for (int i = 0; i < numPuts; i++) {
-      r.put("Key_" + i, new SimpleClass(i, (byte)i));
-    }
-  }
   
-  public static void doPutsPDXSerializable2(String regionName, int numPuts) {
-    Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    for (int i = 0; i < numPuts; i++) {
-      r.put("Key_" + i, new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i));
-    }
-  }
-  
-  
-  public static void doTxPuts(String regionName, int numPuts) {
-    Region r = cache.getRegion(Region.SEPARATOR + regionName);
-    assertNotNull(r);
-    CacheTransactionManager mgr = cache.getCacheTransactionManager();
-
-    mgr.begin();
-    r.put(0, 0);
-    r.put(100, 100);
-    r.put(200, 200);
-    mgr.commit();
-  }
-    
   public static Map updateShipmentPartitionedRegion(int numPuts) {
     assertNotNull(cache);
     assertNotNull(shipmentRegion);
@@ -3495,7 +3435,36 @@ public class WANTestBase extends DistributedTestCase{
     }
     return shipmentKeyValue;
   }
+
+  public static void doPutsPDXSerializable(String regionName, int numPuts) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (int i = 0; i < numPuts; i++) {
+      r.put("Key_" + i, new SimpleClass(i, (byte)i));
+    }
+  }
+  
+  public static void doPutsPDXSerializable2(String regionName, int numPuts) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    for (int i = 0; i < numPuts; i++) {
+      r.put("Key_" + i, new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i));
+    }
+  }
   
+  
+  public static void doTxPuts(String regionName, int numPuts) {
+    Region r = cache.getRegion(Region.SEPARATOR + regionName);
+    assertNotNull(r);
+    CacheTransactionManager mgr = cache.getCacheTransactionManager();
+
+    mgr.begin();
+    r.put(0, 0);
+    r.put(100, 100);
+    r.put(200, 200);
+    mgr.commit();
+  }
+    
   public static void doNextPuts(String regionName, int start, int numPuts) {
     //waitForSitesToUpdate();
     ExpectedException exp = addExpectedException(CacheClosedException.class
@@ -3512,6 +3481,10 @@ public class WANTestBase extends DistributedTestCase{
   }
   
   public static void checkQueueSize(String senderId, int numQueueEntries) {
+    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> testQueueSize(senderId, numQueueEntries));
+  }
+  
+  public static void testQueueSize(String senderId, int numQueueEntries) {
     GatewaySender sender = null;
     for (GatewaySender s : cache.getGatewaySenders()) {
       if (s.getId().equals(senderId)) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7249514/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 19f6c4b..3d0eb5b 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -210,10 +210,10 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
     vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 100 });
     
     //now, pause all of the senders
-    vm4.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
     
     //SECOND RUN: keep one thread doing puts to the region
     vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 1000 });

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7249514/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index 4acaaf4..763b9c4 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -40,92 +40,31 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
   }
 
   public void testParallelPropagationConflationDisabled() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true  });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true  });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, false, false, null, true  });
-
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-    pause(3000);
-
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-
-    pause(2000);
-    
-    final Map keyValues = new HashMap();
-    final Map updateKeyValues = new HashMap();
-    for(int i=0; i< 1000; i++) {
-      keyValues.put(i, i);
-    }
-    
-    
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+    initialSetUp();
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
-    for(int i=0;i<500;i++) {
-      updateKeyValues.put(i, i+"_updated");
-    }
+    createSendersNoConflation();
+
+    createSenderPRs();
+
+    startPausedSenders();
+
+    createReceiverPrs();
+
+    final Map keyValues = putKeyValues();
+
+    vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() ));
     
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+    final Map updateKeyValues = updateKeyValues();
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (keyValues.size() + updateKeyValues.size()) });
+    vm4.invoke(() ->checkQueueSize( "ln", (keyValues.size() + updateKeyValues.size()) ));
 
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName, 0 });
+    vm2.invoke(() ->validateRegionSize(
+        testName, 0 ));
 
-    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    resumeSenders();
 
     keyValues.putAll(updateKeyValues);
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName, keyValues.size() });
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName, keyValues.size() });
-    
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        testName, keyValues });
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        testName, keyValues });
+    validateReceiverRegionSize(keyValues);
     
   }
 
@@ -137,661 +76,421 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
    * @throws Exception
    */
   public void testParallelPropagationBatchConflation() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 50, false, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 50, false, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-      true, 100, 50, false, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-    true, 100, 50, false, false, null, true });
+    initialSetUp();
+    
+    vm4.invoke(() ->createSender( "ln", 2,
+        true, 100, 50, false, false, null, true ));
+    vm5.invoke(() ->createSender( "ln", 2,
+      true, 100, 50, false, false, null, true ));
+    vm6.invoke(() ->createSender( "ln", 2,
+      true, 100, 50, false, false, null, true ));
+    vm7.invoke(() ->createSender( "ln", 2,
+    true, 100, 50, false, false, null, true ));
   
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
+    createSenderPRs();
     
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    startSenders();
     
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    pauseSenders();
     
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
+    createReceiverPrs();
 
     final Map keyValues = new HashMap();
     
-    for (int i = 1; i <= 100; i++) {
+    for (int i = 1; i <= 10; i++) {
       for (int j = 1; j <= 10; j++) {
         keyValues.put(j, i) ;
       }
-      vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] {
-        testName, keyValues });
+      vm4.invoke(() ->putGivenKeyValue(
+        testName, keyValues ));
     }
     
-    vm4.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "enableConflation", new Object[] { "ln" });
+    vm4.invoke(() ->enableConflation( "ln" ));
+    vm5.invoke(() ->enableConflation( "ln" ));
+    vm6.invoke(() ->enableConflation( "ln" ));
+    vm7.invoke(() ->enableConflation( "ln" ));
     
-    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    resumeSenders();
     
-    pause(2000);
-    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(
-        WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
-    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(
-        WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
-    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(
-        WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
-    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(
-        WANTestBase.class, "getSenderStats", new Object[] { "ln", 0 });
+    ArrayList<Integer> v4List = (ArrayList<Integer>)vm4.invoke(() -> 
+        WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v5List = (ArrayList<Integer>)vm5.invoke(() -> 
+        WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v6List = (ArrayList<Integer>)vm6.invoke(() -> 
+        WANTestBase.getSenderStats( "ln", 0 ));
+    ArrayList<Integer> v7List = (ArrayList<Integer>)vm7.invoke(() -> 
+        WANTestBase.getSenderStats( "ln", 0 ));
     
-    getLogWriter().info("KBKBKB: batch conflated events : vm4 : " + v4List.get(8));
-    getLogWriter().info("KBKBKB: batch conflated events : vm5 : " + v5List.get(8));
-    getLogWriter().info("KBKBKB: batch conflated events : vm6 : " + v6List.get(8));
-    getLogWriter().info("KBKBKB: batch conflated events : vm7 : " + v7List.get(8));
-    getLogWriter().info("KBKBKB: batch conflated events : " + (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)));
     assertTrue("No events conflated in batch", (v4List.get(8) + v5List.get(8) + v6List.get(8) + v7List.get(8)) > 0);
     
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName, 10 });
+    vm2.invoke(() ->validateRegionSize(
+      testName, 10 ));
 
   }
   
   public void testParallelPropagationConflation() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-    pause(3000);
-
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-
-    pause(2000);
-    
-    final Map keyValues = new HashMap();
-    final Map updateKeyValues = new HashMap();
-    for(int i=0; i< 1000; i++) {
-      keyValues.put(i, i);
-    }
-    
-    
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+    doTestParallelPropagationConflation(0);
+  }
+  
+  public void testParallelPropagationConflationRedundancy2() throws Exception {
+    doTestParallelPropagationConflation(2);
+  }
+  
+  public void doTestParallelPropagationConflation(int redundancy) throws Exception {
+    initialSetUp();
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
-    for(int i=0;i<500;i++) {
-      updateKeyValues.put(i, i+"_updated");
-    }
-    
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+    createSendersWithConflation();
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
-    
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+    createSenderPRs(redundancy);
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
+    startPausedSenders();
 
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName, 0 });
+    createReceiverPrs();
 
-    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    final Map keyValues = putKeyValues();
 
-    keyValues.putAll(updateKeyValues);
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName, keyValues.size() });
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName, keyValues.size() });
-    
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        testName, keyValues });
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        testName, keyValues });
-  }
-  
-
-  /**
-   * Reproduce the bug #47213.
-   * The test is same as above test, with the only difference that 
-   * redundancy is set to 1.
-   * @throws Exception
-   */
-  public void testParallelPropagationConflation_Bug47213() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 2, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 2, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 2, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 2, 100, isOffHeap() });
-
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-    pause(3000);
-
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-
-    pause(2000);//give some time for all the senders to pause
-    
-    final Map keyValues = new HashMap();
-    final Map updateKeyValues = new HashMap();
-    for(int i=0; i< 1000; i++) {
-      keyValues.put(i, i);
-    }
-    
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+    vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() ));
+    final Map updateKeyValues = updateKeyValues();
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
-    for(int i=0;i<500;i++) {
-      updateKeyValues.put(i, i+"_updated");
-    }
+    vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); // creates aren't conflated
     
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+    vm4.invoke(() ->putGivenKeyValue( testName, updateKeyValues ));
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() }); // creates aren't conflated
+    vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() )); // creates aren't conflated
 
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName, 0 });
+    vm2.invoke(() ->validateRegionSize(
+        testName, 0 ));
 
-    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    resumeSenders();
 
     keyValues.putAll(updateKeyValues);
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName, keyValues.size() });
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName, keyValues.size() });
-    
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        testName, keyValues });
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        testName, keyValues });
+    validateReceiverRegionSize(keyValues);
   }
   
   public void testParallelPropagationConflationOfRandomKeys() throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-
-    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, "ln", 0, 100, isOffHeap() });
-
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-    pause(3000);
-
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
-        testName, null, 1, 100, isOffHeap() });
-
-    pause(2000);
+    initialSetUp();
 
-    final Map keyValues = new HashMap();
-    final Map updateKeyValues = new HashMap();
-    for(int i=0; i< 1000; i++) {
-      keyValues.put(i, i);
-    }
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, keyValues });
+    createSendersWithConflation();
+
+    createSenderPRs();
+
+    startPausedSenders();
+
+    createReceiverPrs();
+
+    final Map keyValues = putKeyValues();
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() });
+    vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() ));
     
-    while(updateKeyValues.size()!=500) {
+    final Map updateKeyValues = new HashMap();
+    while(updateKeyValues.size()!=10) {
       int key = (new Random()).nextInt(keyValues.size());
       updateKeyValues.put(key, key+"_updated");
     }
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+    vm4.invoke(() ->putGivenKeyValue( testName, updateKeyValues ));
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+    vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() ));
 
-    vm4.invoke(WANTestBase.class, "putGivenKeyValue", new Object[] { testName, updateKeyValues });
+    vm4.invoke(() ->putGivenKeyValue( testName, updateKeyValues ));
 
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", keyValues.size() + updateKeyValues.size() });
+    vm4.invoke(() ->checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() ));
 
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName, 0 });
+    vm2.invoke(() ->validateRegionSize(
+        testName, 0 ));
 
-    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    resumeSenders();
 
     
     keyValues.putAll(updateKeyValues);
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        testName, keyValues.size() });
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-      testName, keyValues.size() });
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-      testName, keyValues });
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-      testName, keyValues });
+    validateReceiverRegionSize(keyValues);
     
   }
   
   public void testParallelPropagationColocatedRegionConflation()
       throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-
-    vm4.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, "ln", 0, 100, isOffHeap() });
-
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-    pause(3000);
-
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-    vm2.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, null, 1, 100, isOffHeap() });
-    vm3.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, null, 1, 100, isOffHeap() });
-
-    pause(2000);
-
-    Map custKeyValues = (Map)vm4.invoke(WANTestBase.class, "putCustomerPartitionedRegion",
-        new Object[] { 1000 });
-    Map orderKeyValues = (Map)vm4.invoke(WANTestBase.class, "putOrderPartitionedRegion",
-        new Object[] { 1000 });
-    Map shipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "putShipmentPartitionedRegion",
-        new Object[] { 1000 });
-
-    vm4.invoke(
-        WANTestBase.class,
-        "checkQueueSize",
-        new Object[] {
+    initialSetUp();
+
+    createSendersWithConflation();
+
+    createOrderShipmentOnSenders();
+
+    startPausedSenders();
+
+    createOrderShipmentOnReceivers();
+
+    Map custKeyValues = (Map)vm4.invoke(() ->putCustomerPartitionedRegion( 20 ));
+    Map orderKeyValues = (Map)vm4.invoke(() ->putOrderPartitionedRegion( 20 ));
+    Map shipmentKeyValues = (Map)vm4.invoke(() ->putShipmentPartitionedRegion( 20 ));
+
+    vm4.invoke(() -> 
+        WANTestBase.checkQueueSize(
             "ln",
             (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
-                .size()) });
-
-    Map updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
-        new Object[] { 500 });
-    Map updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegion",
-        new Object[] { 500 });
-    Map updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegion",
-        new Object[] { 500 });
-
-    vm4.invoke(
-        WANTestBase.class,
-        "checkQueueSize",
-        new Object[] {
+                .size()) ));
+
+    Map updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 ));
+    Map updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegion( 10 ));
+    Map updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegion( 10 ));
+    int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+        .size())
+        + updatedCustKeyValues.size()
+        + updatedOrderKeyValues.size()
+        + updatedShipmentKeyValues.size();
+    vm4.invoke(() -> 
+        WANTestBase.checkQueueSize(
             "ln",
-            (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
-                .size())
-                + updatedCustKeyValues.size()
-                + updatedOrderKeyValues.size()
-                + updatedShipmentKeyValues.size() });
-
-    updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
-        new Object[] { 500 });
-    updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegion",
-        new Object[] { 500 });
-    updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegion",
-        new Object[] { 500 });
-
-    vm4.invoke(
-        WANTestBase.class,
-        "checkQueueSize",
-        new Object[] {
+            sum));
+
+    
+    updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 ));
+    updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegion( 10 ));
+    updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegion( 10 ));
+    int sum2 = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+        .size())
+        + updatedCustKeyValues.size()
+        + updatedOrderKeyValues.size()
+        + updatedShipmentKeyValues.size();
+    vm4.invoke(() -> 
+        WANTestBase.checkQueueSize(
             "ln",
-            (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
-                .size())
-                + updatedCustKeyValues.size()
-                + updatedOrderKeyValues.size()
-                + updatedShipmentKeyValues.size() });
-
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.customerRegionName, 0 });
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.orderRegionName, 0 });
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.shipmentRegionName, 0 });
-
-    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+            sum2));
+
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.customerRegionName, 0 ));
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.orderRegionName, 0 ));
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.shipmentRegionName, 0 ));
+
+    resumeSenders();
     
     custKeyValues.putAll(updatedCustKeyValues);
     orderKeyValues.putAll(updatedOrderKeyValues);
     shipmentKeyValues.putAll(updatedShipmentKeyValues);
     
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.customerRegionName, custKeyValues.size() });
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.orderRegionName, orderKeyValues.size() });
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
-
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.customerRegionName, custKeyValues });
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.orderRegionName, orderKeyValues });
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.shipmentRegionName, shipmentKeyValues });
-    
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.customerRegionName, custKeyValues.size() });
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.orderRegionName, orderKeyValues.size() });
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
-
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.customerRegionName, custKeyValues });
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.orderRegionName, orderKeyValues });
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.shipmentRegionName, shipmentKeyValues });
+    validateColocatedRegionContents(custKeyValues, orderKeyValues,
+        shipmentKeyValues);
     
   }
   
+  //
+  //This is the same as the previous test, except for the UsingCustId methods
   public void testParallelPropagationColoatedRegionConflationSameKey()
       throws Exception {
-    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
-        "createFirstLocatorWithDSId", new Object[] { 1 });
-    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
-        "createFirstRemoteLocator", new Object[] { 2, lnPort });
-
-    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
-
-    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
-
-    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
-        true, 100, 10, true, false, null, true });
-
-    vm4.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, "ln", 0, 100, isOffHeap() });
-    vm5.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, "ln", 0, 100, isOffHeap() });
-    vm6.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, "ln", 0, 100, isOffHeap() });
-    vm7.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, "ln", 0, 100, isOffHeap() });
-
-    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
-
-    pause(3000);
-
-    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
-
-    vm2.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, null, 1, 100, isOffHeap() });
-    vm3.invoke(WANTestBase.class,
-        "createCustomerOrderShipmentPartitionedRegion", new Object[] {
-            testName, null, 1, 100, isOffHeap() });
-
-    pause(2000);
-
-    Map custKeyValues = (Map)vm4.invoke(WANTestBase.class, "putCustomerPartitionedRegion",
-        new Object[] { 1000 });
-    Map orderKeyValues = (Map)vm4.invoke(WANTestBase.class, "putOrderPartitionedRegionUsingCustId",
-        new Object[] { 1000 });
-    Map shipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "putShipmentPartitionedRegionUsingCustId",
-        new Object[] { 1000 });
-
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
-      .size()) });
-
-    Map updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
-        new Object[] { 500 });
-    Map updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegionUsingCustId",
-        new Object[] { 500 });
-    Map updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegionUsingCustId",
-        new Object[] { 500 });
-
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
-        .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() });
-
-    updatedCustKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateCustomerPartitionedRegion",
-        new Object[] { 500 });
-    updatedOrderKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateOrderPartitionedRegionUsingCustId",
-        new Object[] { 500 });
-    updatedShipmentKeyValues = (Map)vm4.invoke(WANTestBase.class, "updateShipmentPartitionedRegionUsingCustId",
-        new Object[] { 500 });
-
-    vm4.invoke(WANTestBase.class, "checkQueueSize", new Object[] { "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
-        .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() });
-
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.customerRegionName, 0 });
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.orderRegionName, 0 });
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.shipmentRegionName, 0 });
-
-    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
-    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    initialSetUp();
+
+    createSendersWithConflation();
+
+    createOrderShipmentOnSenders();
+
+    startPausedSenders();
+
+    createOrderShipmentOnReceivers();
+
+    Map custKeyValues = (Map)vm4.invoke(() ->putCustomerPartitionedRegion( 20 ));
+    Map orderKeyValues = (Map)vm4.invoke(() ->putOrderPartitionedRegionUsingCustId( 20 ));
+    Map shipmentKeyValues = (Map)vm4.invoke(() ->putShipmentPartitionedRegionUsingCustId( 20 ));
+
+    vm4.invoke(() ->checkQueueSize( "ln", (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+      .size()) ));
+
+    Map updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 ));
+    Map updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegionUsingCustId( 10 ));
+    Map updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegionUsingCustId( 10 ));
+    int sum = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+        .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size() ;
+
+    vm4.invoke(() ->checkQueueSize( "ln", sum));
+
+    updatedCustKeyValues = (Map)vm4.invoke(() ->updateCustomerPartitionedRegion( 10 ));
+    updatedOrderKeyValues = (Map)vm4.invoke(() ->updateOrderPartitionedRegionUsingCustId( 10 ));
+    updatedShipmentKeyValues = (Map)vm4.invoke(() ->updateShipmentPartitionedRegionUsingCustId( 10 ));
+
+    int sum2 = (custKeyValues.size() + orderKeyValues.size() + shipmentKeyValues
+        .size()) + updatedCustKeyValues.size() + updatedOrderKeyValues.size() + updatedShipmentKeyValues.size();
+    vm4.invoke(() ->checkQueueSize( "ln", sum2));
+
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.customerRegionName, 0 ));
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.orderRegionName, 0 ));
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.shipmentRegionName, 0 ));
+
+    resumeSenders();
 
     custKeyValues.putAll(updatedCustKeyValues);
     orderKeyValues.putAll(updatedOrderKeyValues);
     shipmentKeyValues.putAll(updatedShipmentKeyValues);
     
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.customerRegionName, custKeyValues.size() });
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.orderRegionName, orderKeyValues.size() });
-    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
-
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.customerRegionName, custKeyValues });
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.orderRegionName, orderKeyValues });
-    vm2.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.shipmentRegionName, shipmentKeyValues });
-
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.customerRegionName, custKeyValues.size() });
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.orderRegionName, orderKeyValues.size() });
-    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
-        WANTestBase.shipmentRegionName, shipmentKeyValues.size() });
-
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.customerRegionName, custKeyValues });
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.orderRegionName, orderKeyValues });
-    vm3.invoke(WANTestBase.class, "validateRegionContents", new Object[] {
-        WANTestBase.shipmentRegionName, shipmentKeyValues });
+    validateColocatedRegionContents(custKeyValues, orderKeyValues,
+        shipmentKeyValues);
+  }
+  
+  protected void validateColocatedRegionContents(Map custKeyValues,
+      Map orderKeyValues, Map shipmentKeyValues) {
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.customerRegionName, custKeyValues.size() ));
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.orderRegionName, orderKeyValues.size() ));
+    vm2.invoke(() ->validateRegionSize(
+        WANTestBase.shipmentRegionName, shipmentKeyValues.size() ));
+
+    vm2.invoke(() ->validateRegionContents(
+        WANTestBase.customerRegionName, custKeyValues ));
+    vm2.invoke(() ->validateRegionContents(
+        WANTestBase.orderRegionName, orderKeyValues ));
+    vm2.invoke(() ->validateRegionContents(
+        WANTestBase.shipmentRegionName, shipmentKeyValues ));
+    
+    vm3.invoke(() ->validateRegionSize(
+        WANTestBase.customerRegionName, custKeyValues.size() ));
+    vm3.invoke(() ->validateRegionSize(
+        WANTestBase.orderRegionName, orderKeyValues.size() ));
+    vm3.invoke(() ->validateRegionSize(
+        WANTestBase.shipmentRegionName, shipmentKeyValues.size() ));
+
+    vm3.invoke(() ->validateRegionContents(
+        WANTestBase.customerRegionName, custKeyValues ));
+    vm3.invoke(() ->validateRegionContents(
+        WANTestBase.orderRegionName, orderKeyValues ));
+    vm3.invoke(() ->validateRegionContents(
+        WANTestBase.shipmentRegionName, shipmentKeyValues ));
+  }
+
+  protected void createOrderShipmentOnReceivers() {
+    vm2.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+            testName, null, 1, 8, isOffHeap() ));
+    vm3.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+            testName, null, 1, 8, isOffHeap() ));
+  }
+
+  protected void createOrderShipmentOnSenders() {
+    vm4.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+            testName, "ln", 0, 8, isOffHeap() ));
+    vm5.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+            testName, "ln", 0, 8, isOffHeap() ));
+    vm6.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+            testName, "ln", 0, 8, isOffHeap() ));
+    vm7.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
+            testName, "ln", 0, 8, isOffHeap() ));
+  }
+  
+  protected Map updateKeyValues() {
+    final Map updateKeyValues = new HashMap();
+    for(int i=0;i<10;i++) {
+      updateKeyValues.put(i, i+"_updated");
+    }
+    
+    vm4.invoke(() ->putGivenKeyValue( testName, updateKeyValues ));
+    return updateKeyValues;
+  }
+
+  protected Map putKeyValues() {
+    final Map keyValues = new HashMap();
+    for(int i=0; i< 20; i++) {
+      keyValues.put(i, i);
+    }
+    
+    
+    vm4.invoke(() ->putGivenKeyValue( testName, keyValues ));
+    return keyValues;
+  }
+
+  protected void validateReceiverRegionSize(final Map keyValues) {
+    vm2.invoke(() ->validateRegionSize(
+        testName, keyValues.size() ));
+    vm3.invoke(() ->validateRegionSize(
+      testName, keyValues.size() ));
+    
+    vm2.invoke(() ->validateRegionContents(
+        testName, keyValues ));
+    vm3.invoke(() ->validateRegionContents(
+        testName, keyValues ));
+  }
+
+  protected void resumeSenders() {
+    vm4.invoke(() ->resumeSender( "ln" ));
+    vm5.invoke(() ->resumeSender( "ln" ));
+    vm6.invoke(() ->resumeSender( "ln" ));
+    vm7.invoke(() ->resumeSender( "ln" ));
+  }
+
+  protected void createReceiverPrs() {
+    vm2.invoke(() ->createPartitionedRegion(
+        testName, null, 1, 8, isOffHeap() ));
+    vm3.invoke(() ->createPartitionedRegion(
+        testName, null, 1, 8, isOffHeap() ));
+  }
+
+  protected void startPausedSenders() {
+    startSenders();
+
+    pauseSenders();
+  }
+
+  protected void pauseSenders() {
+    vm4.invoke(() ->pauseSender( "ln" ));
+    vm5.invoke(() ->pauseSender( "ln" ));
+    vm6.invoke(() ->pauseSender( "ln" ));
+    vm7.invoke(() ->pauseSender( "ln" ));
+  }
+
+  protected void startSenders() {
+    vm4.invoke(() ->startSender( "ln" ));
+    vm5.invoke(() ->startSender( "ln" ));
+    vm6.invoke(() ->startSender( "ln" ));
+    vm7.invoke(() ->startSender( "ln" ));
+  }
+  
+  protected void createSenderPRs() {
+    createSenderPRs(0);
+  }
+
+  protected void createSenderPRs(int redundancy) {
+    vm4.invoke(() ->createPartitionedRegion(
+        testName, "ln", redundancy, 8, isOffHeap() ));
+    vm5.invoke(() ->createPartitionedRegion(
+        testName, "ln", redundancy, 8, isOffHeap() ));
+    vm6.invoke(() ->createPartitionedRegion(
+        testName, "ln", redundancy, 8, isOffHeap() ));
+    vm7.invoke(() ->createPartitionedRegion(
+        testName, "ln", redundancy, 8, isOffHeap() ));
+  }
+
+  protected void initialSetUp() {
+    Integer lnPort = (Integer)vm0.invoke(() ->createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() ->createFirstRemoteLocator( 2, lnPort ));
+
+    vm2.invoke(() ->createReceiver( nyPort ));
+    vm3.invoke(() ->createReceiver( nyPort ));
+
+    vm4.invoke(() ->createCache(lnPort ));
+    vm5.invoke(() ->createCache(lnPort ));
+    vm6.invoke(() ->createCache(lnPort ));
+    vm7.invoke(() ->createCache(lnPort ));
+  }
+  
+  protected void createSendersNoConflation() {
+    vm4.invoke(() ->createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() ->createSender( "ln", 2,
+        true, 100, 10, false, false, null, true  ));
+    vm6.invoke(() ->createSender( "ln", 2,
+        true, 100, 10, false, false, null, true  ));
+    vm7.invoke(() ->createSender( "ln", 2,
+        true, 100, 10, false, false, null, true  ));
+  }
+  
+  protected void createSendersWithConflation() {
+    vm4.invoke(() ->createSender( "ln", 2,
+        true, 100, 2, true, false, null, true ));
+    vm5.invoke(() ->createSender( "ln", 2,
+        true, 100, 2, true, false, null, true ));
+    vm6.invoke(() ->createSender( "ln", 2,
+        true, 100, 2, true, false, null, true ));
+    vm7.invoke(() ->createSender( "ln", 2,
+        true, 100, 2, true, false, null, true ));
   }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a7249514/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
index 979439b..a142f82 100644
--- a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -537,7 +537,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
 
     vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
 
-    vm4.invoke(WANTestBase.class, "pauseSenderAndWaitForDispatcherToPause", new Object[] { "ln" });
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
 
     vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
         testName, null,1, 100, isOffHeap()  });