You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:57 UTC

[09/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
new file mode 100644
index 0000000..427054a
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+@Category(DistributedTest.class)
+public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+  
+  public ParallelWANPropagationConcurrentOpsDUnitTest() {
+    super();
+  }
+  
+  /**
+   * Normal propagation scenario test case for a PR with only one bucket.
+   * This has been added for bug# 44284.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationWithSingleBucketPR() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+ 
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+ 
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 1, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 1, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 1, isOffHeap() ));
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    //pause the senders
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    
+    Wait.pause(5000);
+    
+    AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 ));
+    AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 ));
+    AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    
+    async1.join();
+    async2.join();
+    async3.join();
+    async4.join();
+    
+    int queueSize = (Integer) vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
+    assertEquals("Actual queue size is not matching with the expected", 3500, queueSize);
+    
+    //resume the senders now
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  /**
+   * Normal propagation scenario test case for a PR with less number of buckets.
+   * Buckets have been kept to 10 for this test.
+   * This has been added for bug# 44287.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationWithLowNumberOfBuckets() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+ 
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+ 
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 10, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 10, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 10, isOffHeap() ));
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 700 ));
+    AsyncInvocation async2 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    AsyncInvocation async3 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 800 ));
+    AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    
+    async1.join();
+    async2.join();
+    async3.join();
+    async4.join();
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  @Test
+  public void testParallelQueueDrainInOrder_PR() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 3, 4, isOffHeap() ));
+  
+    vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 3, 4, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.addQueueListener( "ln", true));
+    vm5.invoke(() -> WANTestBase.addQueueListener( "ln", true));
+    vm6.invoke(() -> WANTestBase.addQueueListener( "ln", true));
+    vm7.invoke(() -> WANTestBase.addQueueListener( "ln", true));
+
+    Wait.pause(2000);
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln"));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln"));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ln"));
+    
+    Wait.pause(2000);
+    
+    vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      4 ));
+    vm4.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
+    vm5.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
+    vm6.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4 ));
+    vm7.invoke(() -> WANTestBase.addListenerOnBucketRegion(getTestMethodName() + "_PR", 4));
+    
+    vm4.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
+    vm5.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
+    vm6.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4 ));
+    vm7.invoke(() -> WANTestBase.addListenerOnQueueBucketRegion( "ln" , 4));
+    
+    vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      1000 ));
+    
+    vm6.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 1000 ));
+    
+    HashMap vm4BRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
+    HashMap vm5BRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
+    HashMap vm6BRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
+    HashMap vm7BRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkBR(getTestMethodName() + "_PR", 4));
+    
+    List b0SenderUpdates = (List)vm4BRUpdates.get("Create0");
+    List b1SenderUpdates = (List)vm4BRUpdates.get("Create1");
+    List b2SenderUpdates = (List)vm4BRUpdates.get("Create2");
+    List b3SenderUpdates = (List)vm4BRUpdates.get("Create3");
+    
+    HashMap vm4QueueBRUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
+    HashMap vm5QueueBRUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
+    HashMap vm6QueueBRUpdates = (HashMap)vm6.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
+    HashMap vm7QueueBRUpdates = (HashMap)vm7.invoke(() -> WANTestBase.checkQueue_BR("ln", 4));
+    
+    assertEquals(vm4QueueBRUpdates, vm5QueueBRUpdates);
+    assertEquals(vm4QueueBRUpdates, vm6QueueBRUpdates);
+    assertEquals(vm4QueueBRUpdates, vm7QueueBRUpdates);
+    
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln"));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln"));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ln"));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 1000 ));
+    HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkPR(
+      getTestMethodName() + "_PR"));
+    List<Long> createList = (List)receiverUpdates.get("Create");
+    ArrayList<Long> b0ReceiverUpdates = new ArrayList<Long>();
+    ArrayList<Long> b1ReceiverUpdates = new ArrayList<Long>();
+    ArrayList<Long> b2ReceiverUpdates = new ArrayList<Long>();
+    ArrayList<Long> b3ReceiverUpdates = new ArrayList<Long>();
+    for (Long key : createList) {
+      long mod = key % 4;
+      if (mod == 0) {
+        b0ReceiverUpdates.add(key);
+      }
+      else if (mod == 1) {
+        b1ReceiverUpdates.add(key);
+      }
+      else if (mod == 2) {
+        b2ReceiverUpdates.add(key);
+      }
+      else if (mod == 3) {
+        b3ReceiverUpdates.add(key);
+      }
+    }
+    b0ReceiverUpdates.remove(0);
+    b1ReceiverUpdates.remove(0);
+    b2ReceiverUpdates.remove(0);
+    b3ReceiverUpdates.remove(0);
+    
+    assertEquals(b0SenderUpdates, b0ReceiverUpdates);
+    assertEquals(b1SenderUpdates, b1ReceiverUpdates);
+    assertEquals(b2SenderUpdates, b2ReceiverUpdates);
+    assertEquals(b3SenderUpdates, b3ReceiverUpdates);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
new file mode 100644
index 0000000..7b0cf3e
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -0,0 +1,1234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.Set;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class ParallelWANPropagationDUnitTest extends WANTestBase {
+  private static final long serialVersionUID = 1L;
+
+  public ParallelWANPropagationDUnitTest() {
+    super();
+  }
+
+  @Test
+  public void test_ParallelGatewaySenderMetaRegionNotExposedToUser_Bug44216() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCache(lnPort);
+    createSender("ln", 2, true, 100, 300, false, false,
+        null, true);
+    createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap());
+
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals("ln")) {
+        sender = s;
+        break;
+      }
+    }
+    try {
+      sender.start();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail("Failed with IOException");
+    }
+
+    GemFireCacheImpl gemCache = (GemFireCacheImpl)cache;
+    Set regionSet = gemCache.rootRegions();
+
+    for (Object r : regionSet) {
+      if (((Region)r).getName().equals(
+          ((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0].getRegion().getName())) {
+        fail("The shadowPR is exposed to the user");
+      }
+    }
+  }
+  
+  @Test
+  public void testParallelPropagation_withoutRemoteSite() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //keep a larger batch to minimize number of exception occurrences in the log
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 300, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 300, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 300, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 300, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+    
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      1000 ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+    createReceiverInVMs(vm2, vm3);
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    // Just making sure that though the remote site is started later,
+    // remote site is still able to get the data. Since the receivers are
+    // started before creating partition region it is quite possible that the
+    // region may loose some of the events. This needs to be handled by the code
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  /**
+   * Normal happy scenario test case.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  protected SerializableRunnableIF createReceiverPartitionedRegionRedundancy1() {
+    return () -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap()  );
+  }
+
+  protected SerializableRunnableIF createPartitionedRegionRedundancy1Runnable() {
+    return () -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()  );
+  }
+
+  protected SerializableRunnableIF waitForSenderRunnable() {
+    return () -> WANTestBase.waitForSenderRunningState( "ln" );
+  }
+
+  @Test
+  public void testParallelPropagation_ManualStart() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, false ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, false ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, false ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, false ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  /**
+   * Normal happy scenario test case2.
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationPutBeforeSenderStart() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      1000 ));
+
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  /**
+   * Local and remote sites are up and running.
+   * Local site cache is closed and the site is built again.
+   * Puts are done to local site.
+   * Expected: Remote site should receive all the events put after the local
+   * site was built back.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationWithLocalCacheClosedAndRebuilt() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      1000 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+              getTestMethodName() + "_PR", 1000 ));
+    //-------------------Close and rebuild local site ---------------------------------
+
+    vm4.invoke(() -> WANTestBase.killSender());
+    vm5.invoke(() -> WANTestBase.killSender());
+    vm6.invoke(() -> WANTestBase.killSender());
+    vm7.invoke(() -> WANTestBase.killSender());
+    
+    Integer regionSize = 
+      (Integer) vm2.invoke(() -> WANTestBase.getRegionSize(getTestMethodName() + "_PR" ));
+    LogWriterUtils.getLogWriter().info("Region size on remote is: " + regionSize);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+    
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    //------------------------------------------------------------------------------------
+    
+    IgnoredException.addIgnoredException(EntryExistsException.class.getName());
+    IgnoredException.addIgnoredException(BatchException70.class.getName());
+    IgnoredException.addIgnoredException(ServerOperationException.class.getName());
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  @Test
+  public void testParallelColocatedPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions( getTestMethodName(), null, 1, 100, isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+  }
+  /**
+   * Create colocated partitioned regions.
+   * Parent region has PGS attached and child region doesn't.
+   * 
+   * Validate that events for parent region reaches remote site.
+   * 
+   * @throws Exception
+   */
+
+  @Test
+  public void testParallelColocatedPropagation2() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegions2( getTestMethodName(), null, 1, 100, isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child1", 1000 ));
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"_child2", 1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName()+"_child1", 0 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName()+"_child2", 0 ));
+  }
+
+  
+  @Test
+  public void testParallelPropagationWithOverflow() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //let all the senders start before doing any puts to ensure that none of the events is lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.doHeavyPuts( getTestMethodName(), 150 ));
+
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 150 ));
+  }
+
+  @Test
+  public void testSerialReplicatedAndParallelPartitionedPropagation()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnSerial",
+        2, false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnSerial",
+        2, false, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnParallel",
+        2, true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnParallel",
+        2, true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "lnParallel",
+        2, true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "lnParallel",
+        2, true, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "lnSerial", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnParallel", 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("lnSerial", vm4, vm5);
+
+    startSenderInVMs("lnParallel", vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+    vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  @Test
+  public void testPartitionedParallelPropagationToTwoWanSites()
+      throws Exception {
+    Integer lnPort = createFirstLocatorWithDSId(1);
+    Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    createReceiverInVMs(vm2);
+
+    createCacheInVMs(tkPort, vm3);
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+    createReceiverInVMs(vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnParallel1",
+        2, true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnParallel1",
+        2, true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "lnParallel1",
+        2, true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "lnParallel1",
+        2, true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "lnParallel2",
+        3, true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "lnParallel2",
+        3, true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "lnParallel2",
+        3, true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "lnParallel2",
+        3, true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "lnParallel1,lnParallel2", 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("lnParallel1", vm4, vm5, vm6, vm7);
+
+    startSenderInVMs("lnParallel2", vm4, vm5, vm6, vm7);
+
+
+    //before doing puts, make sure that the senders are started.
+    //this will ensure that not a single events is lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel1" ));
+
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "lnParallel2" ));
+    
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel1"));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("lnParallel2"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 1000 ));
+  }
+
+  @Category(FlakyTest.class) // GEODE-1008 and GEODE-1180: random ports, async actions, thread sleeps, time sensitive, waitForCriterion
+  @Test
+  public void testPartitionedParallelPropagationHA() throws Exception {
+    IgnoredException.addIgnoredException("Broken pipe");
+    IgnoredException.addIgnoredException("Connection reset");
+    IgnoredException.addIgnoredException("Unexpected IOException");
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 2, 100, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    AsyncInvocation inv1 = vm7.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 5000 ));
+    Wait.pause(500);
+    AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 10000 ));
+    Wait.pause(1500);
+    AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+    inv1.join();
+    inv2.join();
+    inv3.join();
+    inv4.join();
+    
+    vm6.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+    vm7.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+    
+    //verify all buckets drained on the sender nodes that up and running.
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 10000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 10000 ));
+  }
+
+  @Test
+  public void testParallelPropagationWithFilter() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false,
+        new MyGatewayEventFilter(), true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap()  ));
+
+    //wait for senders to be running before doing any puts. This will ensure that
+    //not a single events is lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 800 ));
+  }
+  
+  
+  @Test
+  public void testParallelPropagationWithPutAll() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.doPutAll( getTestMethodName() + "_PR",
+        100 , 50 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 5000 ));
+  
+  }
+  
+  /**
+   * There was a bug that all destroy events were being put into different buckets of sender queue
+   * against the key 0. Bug# 44304
+   *  
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationWithDestroy() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 100, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 100, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 100, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 100, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm6.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm7.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    
+    Wait.pause(2000);
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    vm4.invoke(() -> WANTestBase.doDestroys( getTestMethodName() + "_PR", 500 ));
+    
+    
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueBucketSize( "ln", 15 ));
+
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    
+    //give some time for the queue to drain
+    Wait.pause(5000);
+    
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 500 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 500 ));
+  
+  }
+  
+  /**
+   * Normal happy scenario test case. But with Tx operations
+   * @throws Exception
+   */
+  @Test
+  public void testParallelPropagationTxOperations() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    createCacheInVMs(lnPort, vm4, vm5);
+    //vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+    //vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    //vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+    //    true, 100, 10, false, false, null, true ));
+    //vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+    //    true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+//    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+//        testName + "_PR", "ln", true, 1, 100, isOffHeap()  ));
+//    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+//        testName + "_PR", "ln", true, 1, 100, isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+//    vm6.invoke(() -> WANTestBase.startSender( "ln" ));
+//    vm7.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm2.invoke(createReceiverPartitionedRegionRedundancy1());
+    vm3.invoke(createReceiverPartitionedRegionRedundancy1());
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+//    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+//    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm4.invoke(() -> WANTestBase.doTxPuts( getTestMethodName() + "_PR"));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+//    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+//    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 3 ));
+  }
+
+  @Ignore
+  @Test
+  public void testParallelGatewaySenderQueueLocalSize() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+    createCacheInVMs(lnPort, vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    
+    /*
+     * Remember pausing sender does not guarantee that peek will be paused
+     * immediately as its quite possible event processor is already in peeking
+     * events and send them after peeking without a check for pause. hence below
+     * pause of 1 sec to allow dispatching to be paused
+     */
+//    vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
+//    vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
+    Wait.pause(1000);
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        10 ));
+    
+    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+    vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+    
+    // instead of checking size as 5 and 5. check that combined size is 10
+    Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
+    Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
+    assertEquals(10,  localSize1 + localSize2);
+  }
+  
+
+
+  public void tParallelGatewaySenderQueueLocalSizeWithHA() {
+    IgnoredException.addIgnoredException("Broken pipe");
+    IgnoredException.addIgnoredException("Connection reset");
+    IgnoredException.addIgnoredException("Unexpected IOException");
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+    createCacheInVMs(lnPort, vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(createPartitionedRegionRedundancy1Runnable());
+    vm5.invoke(createPartitionedRegionRedundancy1Runnable());
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    
+    /*
+     * Remember pausing sender does not guarantee that peek will be paused
+     * immediately as its quite possible event processor is already in peeking
+     * events and send them after peeking without a check for pause. hence below
+     * pause of 1 sec to allow dispatching to be paused
+     */
+//    vm4.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
+//    vm5.invoke(() -> WANTestBase.waitForSenderPausedState( "ln" ));
+    Wait.pause(1000);
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        10 ));
+    
+    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+    vm5.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+    
+    Integer localSize1 = (Integer)vm4.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
+    Integer localSize2 = (Integer)vm5.invoke(() -> WANTestBase.getPRQLocalSize( "ln"));
+    assertEquals(10,  localSize1 + localSize2);
+    
+    vm5.invoke(() -> WANTestBase.killSender( ));
+    
+    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln", 10 ));
+    vm4.invoke(() -> WANTestBase.checkPRQLocalSize( "ln", 10 ));
+    
+  }
+  
+  /**
+   * Added for defect #50364 Can't colocate region that has AEQ with a region that does not have that same AEQ
+   */
+  @Test
+  public void testParallelSenderAttachedToChildRegionButNotToParentRegion() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+	  //create cache and receiver on site2
+    createCacheInVMs(nyPort, vm2);
+    createReceiverInVMs(vm2);
+	  //create cache on site1
+    createCacheInVMs(lnPort, vm3);
+
+	  //create sender on site1
+    vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    
+    //start sender on site1
+    startSenderInVMs("ln", vm3);
+
+    //create leader (parent) PR on site1
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() ));
+    String parentRegionFullPath = 
+      (String) vm3.invoke(() -> WANTestBase.getRegionFullPath( getTestMethodName() + "PARENT_PR"));
+    
+    //create colocated (child) PR on site1
+    vm3.invoke(() -> WANTestBase.createColocatedPartitionedRegion(
+        getTestMethodName() + "CHILD_PR", "ln", 0, 100, parentRegionFullPath ));
+    
+    //create leader and colocated PR on site2
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "PARENT_PR", null, 0, 100, isOffHeap() ));
+    vm2.invoke(() -> WANTestBase.createColocatedPartitionedRegion(
+        getTestMethodName() + "CHILD_PR", null, 0, 100, parentRegionFullPath ));
+    
+    //do puts in colocated (child) PR on site1
+    vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "CHILD_PR", 1000 ));
+    
+    //verify the puts reach site2
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "CHILD_PR", 1000 ));
+  }
+  
+  @Test
+  public void testParallelPropagationWithFilter_AfterAck() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm6, vm7);
+    createReceiverInVMs(vm6, vm7);
+
+    createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
+
+    vm2.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
+    vm3.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, new MyGatewayEventFilter_AfterAck(), true ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm2, vm3, vm4, vm5);
+
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+    // wait for senders to be running before doing any puts. This will ensure
+    // that
+    // not a single events is lost
+    vm2.invoke(waitForSenderRunnable());
+    vm3.invoke(waitForSenderRunnable());
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateQueueContents( "ln",
+        0 ));
+    vm3.invoke(() -> WANTestBase.validateQueueContents( "ln",
+        0 ));
+    vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
+        0 ));
+    vm5.invoke(() -> WANTestBase.validateQueueContents( "ln",
+        0 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    
+    vm6.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm7.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    
+    Integer vm2Acks = (Integer)vm2.invoke(() -> WANTestBase.validateAfterAck( "ln"));
+    Integer vm3Acks = (Integer)vm3.invoke(() -> WANTestBase.validateAfterAck( "ln"));
+    Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( "ln"));
+    Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( "ln"));
+
+    assertEquals(2000, (vm2Acks + vm3Acks + vm4Acks + vm5Acks));
+        
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
new file mode 100644
index 0000000..07a6223
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+@Category(DistributedTest.class)
+public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+  
+  public ParallelWANPropagationLoopBackDUnitTest() {
+    super();
+  }
+  
+  /**
+   * Test loop back issue between 2 WAN sites (LN & NY). LN -> NY -> LN.
+   * Site1 (LN): vm2, vm4, vm5
+   * Site2 (NY): vm3, vm6, vm7
+   */
+  @Test
+  public void testParallelPropagationLoopBack() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on site1 and site2
+    createCacheInVMs(lnPort, vm2, vm4, vm5);
+    vm2.invoke(() -> WANTestBase.createReceiver());
+    createCacheInVMs(nyPort, vm3, vm6, vm7);
+    vm3.invoke(() -> WANTestBase.createReceiver());
+
+    //create senders on site1
+    vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    
+    //create senders on site2
+    vm3.invoke(() -> WANTestBase.createSender( "ny", 1,
+      true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ny", 1,
+      true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ny", 1,
+      true, 100, 10, false, false, null, true ));
+    
+    //create PR on site1
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap()  ));
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap()  ));
+    
+    //create PR on site2
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap()  ));
+    
+    //start sender on site1
+    startSenderInVMs("ln", vm2, vm4, vm5);
+
+
+    //start sender on site2
+    startSenderInVMs("ny", vm3, vm6, vm7);
+
+
+    //pause senders on site1
+    vm2.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    
+    //pause senders on site2
+    vm3.invoke(() -> WANTestBase.pauseSender( "ny" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ny" ));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ny" ));
+    
+    //this is required since sender pause doesn't take effect immediately
+    Wait.pause(1000);
+    
+    //Do 100 puts on site1
+    vm2.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      100 ));
+    //do next 100 puts on site2
+    vm3.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
+      100, 200 ));
+    //verify queue size on both sites
+    vm2.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
+    vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
+    vm5.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
+    
+    vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+    vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+    vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+    
+    //resume sender on site1
+    vm2.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+     
+    //validate events reached site2 from site1
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 200 ));
+    
+    //on site2, verify queue size again
+    //this ensures that loopback is not happening since the queue size is same as before
+    //the event coming from site1 are not enqueued again
+    vm3.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+    vm6.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+    vm7.invoke(() -> WANTestBase.verifyQueueSize( "ny", 100 ));
+    
+    //resume sender on site2
+    vm3.invoke(() -> WANTestBase.resumeSender( "ny" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ny" ));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ny" ));
+    
+    //validate region size on both the sites
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+  }
+  
+  /**
+   * Test loop back issue among 3 WAN sites with Ring topology i.e. LN -> NY -> TK -> LN
+   * Site1 (LN): vm3, vm6
+   * Site2 (NY): vm4, vm7
+   * Site3 (TK): vm5
+   */
+  @Test
+  public void testParallelPropagationLoopBack3Sites() {
+    //Create locators
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+    
+    //create cache and receivers on all the 3 sites
+    createCacheInVMs(lnPort, vm3, vm6);
+    createReceiverInVMs(vm3, vm6);
+    createCacheInVMs(nyPort, vm4, vm7);
+    createReceiverInVMs(vm4, vm7);
+    createCacheInVMs(tkPort, vm5);
+    createReceiverInVMs(vm5);
+
+
+    //create senders on all the 3 sites
+    vm3.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    
+    vm4.invoke(() -> WANTestBase.createSender( "ny", 3,
+      true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ny", 3,
+      true, 100, 10, false, false, null, true ));
+    
+    vm5.invoke(() -> WANTestBase.createSender( "tk", 1,
+      true, 100, 10, false, false, null, true ));
+    
+    //create PR on the 3 sites
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 0, 100, isOffHeap() ));
+    
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ny", 0, 100, isOffHeap() ));
+    
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "tk", 0, 100, isOffHeap() ));
+    
+    //start senders on all the sites 
+    startSenderInVMs("ln", vm3, vm6);
+
+    startSenderInVMs("ny", vm4, vm7);
+
+    vm5.invoke(() -> WANTestBase.startSender( "tk" ));
+    
+    //pause senders on site1 and site3. Site2 has the sender running to pass along events
+    vm3.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    
+    vm5.invoke(() -> WANTestBase.pauseSender( "tk" ));
+    
+    //need to have this pause since pauseSender doesn't take effect immediately
+    Wait.pause(1000);
+    
+    //do puts on site1
+    vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      100 ));
+    
+    //do more puts on site3
+    vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
+      100, 200 ));
+    
+    //verify queue size on site1 and site3
+    vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln", 100 ));
+    vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 ));
+
+    //resume sender on site1 so that events reach site2 and from there to site3
+    vm3.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    
+    //validate region size on site2 (should have 100) and site3 (should have 200)
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 100 ));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 200 ));
+    
+    //verify queue size remains same on site3 which means event loopback did not happen
+    //this means events coming from site1 are not enqueued back into the sender
+    vm5.invoke(() -> WANTestBase.verifyQueueSize( "tk", 100 ));
+    
+    //resume sender on site3
+    vm5.invoke(() -> WANTestBase.resumeSender( "tk" ));
+    
+    //validate region size
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+  }
+  
+  /**
+   * Test loop back issue among 3 WAN sites with N to N topology 
+   * i.e. each site connected to all other sites.
+   * Puts are done to only one DS.
+   * LN site: vm3, vm6
+   * NY site: vm4, vm7
+   * TK site: vm5
+   */
+  @Test
+  public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    Integer tkPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+
+    createCacheInVMs(lnPort, vm3, vm6);
+    createCacheInVMs(nyPort, vm4, vm7);
+    createCacheInVMs(tkPort, vm5);
+    vm3.invoke(() -> WANTestBase.createReceiver());
+    vm4.invoke(() -> WANTestBase.createReceiver());
+    vm5.invoke(() -> WANTestBase.createReceiver());
+
+    //site1
+    vm3.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      true, 100, 10, false, false, null, true ));
+
+    vm3.invoke(() -> WANTestBase.createSender( "ln2", 3,
+      true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln2", 3,
+      true, 100, 10, false, false, null, true ));
+    
+    //site2
+    vm4.invoke(() -> WANTestBase.createSender( "ny1", 1,
+      true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ny1", 1,
+      true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "ny2", 3,
+      true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ny2", 3,
+      true, 100, 10, false, false, null, true ));
+
+    //site3
+    vm5.invoke(() -> WANTestBase.createSender( "tk1", 1,
+      true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "tk2", 2,
+      true, 100, 10, false, false, null, true ));
+
+    //create PR
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln1,ln2", 0, 1, isOffHeap() ));
+    
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ny1,ny2", 0, 1, isOffHeap() ));
+    
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "tk1,tk2", 0, 1, isOffHeap() ));
+
+    //start all the senders
+    vm3.invoke(() -> WANTestBase.startSender( "ln1" ));
+    vm3.invoke(() -> WANTestBase.startSender( "ln2" ));
+    vm6.invoke(() -> WANTestBase.startSender( "ln1" ));
+    vm6.invoke(() -> WANTestBase.startSender( "ln2" ));
+    
+    vm4.invoke(() -> WANTestBase.startSender( "ny1" ));
+    vm4.invoke(() -> WANTestBase.startSender( "ny2" ));
+    vm7.invoke(() -> WANTestBase.startSender( "ny1" ));
+    vm7.invoke(() -> WANTestBase.startSender( "ny2" ));
+    
+    vm5.invoke(() -> WANTestBase.startSender( "tk1" ));
+    vm5.invoke(() -> WANTestBase.startSender( "tk2" ));
+    
+    //pause senders on all the sites
+    vm3.invoke(() -> WANTestBase.pauseSender( "ln1" ));
+    vm3.invoke(() -> WANTestBase.pauseSender( "ln2" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln1" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln2" ));
+    
+    vm4.invoke(() -> WANTestBase.pauseSender( "ny1" ));
+    vm4.invoke(() -> WANTestBase.pauseSender( "ny2" ));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ny1" ));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ny2" ));
+    
+    vm5.invoke(() -> WANTestBase.pauseSender( "tk1" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "tk2" ));
+    
+    //this is required since sender pause doesn't take effect immediately
+    Wait.pause(1000);
+
+    //do puts on site1
+    vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      100 ));
+    
+    //verify queue size on site1 and site3
+    vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln1", 100 ));
+    vm3.invoke(() -> WANTestBase.verifyQueueSize( "ln2", 100 ));
+    
+    //resume sender (from site1 to site2) on site1
+    vm3.invoke(() -> WANTestBase.resumeSender( "ln1" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln1" ));
+
+    //validate region size on site2
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 100 ));
+
+    //verify queue size on site2 (sender 2 to 1)
+    //should remain at 0 as the events from site1 should not go back to site1
+    vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 ));
+    
+    //verify queue size on site2 (sender 2 to 3)
+    //should remain at 0 as events from site1 will reach site3 directly..site2 need not send to site3 again  
+    vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 ));
+    
+    //do more puts on site3
+    vm5.invoke(() -> WANTestBase.doPutsFrom( getTestMethodName() + "_PR",
+      100, 200 ));
+    
+    //resume sender (from site3 to site2) on site3
+    vm5.invoke(() -> WANTestBase.resumeSender( "tk2" ));
+    
+    //validate region size on site2
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+    
+    //verify queue size on site2 (sender 2 to 3)
+    //should remain at 0 as the events from site3 should not go back to site3
+    vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny2", 0 ));
+    
+    //verify queue size on site2 (sender 2 to 1)
+    //should remain at 0 as events from site3 will reach site1 directly..site2 need not send to site1 again  
+    vm4.invoke(() -> WANTestBase.verifyQueueSize( "ny1", 0 ));
+    
+    //resume all senders
+    vm3.invoke(() -> WANTestBase.resumeSender( "ln2" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln2" ));
+    
+    vm4.invoke(() -> WANTestBase.resumeSender( "ny1" ));
+    vm4.invoke(() -> WANTestBase.resumeSender( "ny2" ));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ny1" ));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ny2" ));
+    
+    vm5.invoke(() -> WANTestBase.resumeSender( "tk1" ));
+    
+    //validate region size on all sites
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 200 ));
+  }
+  
+
+
+}