You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:58 UTC

[10/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..359a56d
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -0,0 +1,1593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.io.IOException;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.ColocationHelper;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+
+@Category(DistributedTest.class)
+public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+  
+  public ParallelWANPersistenceEnabledGatewaySenderDUnitTest() {
+    super();
+  }
+
+  @Override
+  protected final void postSetUpWANTestBase() throws Exception {
+    //The restart tests log this string
+    IgnoredException.addIgnoredException("failed accepting client connection");
+  }
+  
+  @Test
+  public void testPartitionedRegionWithGatewaySenderPersistenceEnabled() throws IOException {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+      createCache(lnPort);
+      GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+      fact.setPersistenceEnabled(true);
+      fact.setParallel(true);
+      final IgnoredException ex = IgnoredException.addIgnoredException("Could not connect");
+      try {
+        GatewaySender sender1 = fact.create("NYSender", 2);
+
+        AttributesFactory rFact = new AttributesFactory();
+        rFact.addGatewaySenderId(sender1.getId());
+
+        PartitionAttributesFactory pFact = new PartitionAttributesFactory();
+        pFact.setTotalNumBuckets(100);
+        pFact.setRedundantCopies(1);
+        rFact.setPartitionAttributes(pFact.create());
+        Region r = cache.createRegionFactory(rFact.create()).create("MyRegion");
+        sender1.start();
+      } finally {
+        ex.remove();
+      }
+      
+    }
+    catch (Exception e) {
+      fail("Unexpected Exception :" + e);
+    }
+  }
+  
+  /**
+   * Enable persistence for region as well as GatewaySender and see if remote
+   * site receives all the events.
+   */
+  @Test
+  public void testPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+
+  }
+
+  /**
+   * Enable persistence for the GatewaySender but not the region
+   */
+  @Test
+  public void testPartitionedRegionWithPersistentGatewaySender() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+    
+    LogWriterUtils.getLogWriter().info("Created remote receivers");
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+    
+    LogWriterUtils.getLogWriter().info("Created local site cache");
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, true, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, true, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, true, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, true, null, true ));
+  
+    LogWriterUtils.getLogWriter().info("Created local site senders");
+
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+
+    LogWriterUtils.getLogWriter().info("Created local site persistent PR");
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Started the senders");
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+  }
+
+  protected SerializableRunnableIF createPartitionedRegionRunnable() {
+    return () -> WANTestBase.createPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() );
+  }
+
+
+
+  
+  /**
+   * Enable persistence for GatewaySender.
+   * Pause the sender and do some puts in local region.  
+   * Close the local site and rebuild the region and sender from disk store.
+   * Dispatcher should not start dispatching events recovered from persistent sender.
+   * Check if the remote site receives all the events.
+   */
+  @Test
+  public void testPRWithGatewaySenderPersistenceEnabled_Restart() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    
+    //create PR on local site
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+    
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site -------------------------------------------------
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+   //create senders with disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    //create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(createPartitionedRegionRunnable());
+    AsyncInvocation inv2 = vm5.invokeAsync(createPartitionedRegionRunnable());
+    AsyncInvocation inv3 = vm6.invokeAsync(createPartitionedRegionRunnable());
+    AsyncInvocation inv4 = vm7.invokeAsync(createPartitionedRegionRunnable());
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+    
+    //start the senders in async mode. This will ensure that the 
+    //node of shadow PR that went down last will come up first
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+    
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    //----------------------------------------------------------------------------------------------------
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 3000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 3000 ));
+  }
+
+  protected SerializableRunnableIF killSenderRunnable() {
+    return () -> WANTestBase.killSender();
+  }
+
+  protected SerializableRunnableIF pauseSenderRunnable() {
+    return () -> WANTestBase.pauseSender( "ln" );
+  }
+
+  protected SerializableRunnableIF waitForSenderRunnable() {
+    return () -> WANTestBase.waitForSenderRunningState( "ln" );
+  }
+  
+  /**
+   * Enable persistence for PR and GatewaySender.
+   * Pause the sender and do some puts in local region.  
+   * Close the local site and rebuild the region and sender from disk store.
+   * Dispatcher should not start dispatching events recovered from persistent sender.
+   * Check if the remote site receives all the events.
+   */
+  @Test
+  public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site -------------------------------------------------
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+   //create senders with disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    //create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+    
+    //start the senders in async mode. This will ensure that the 
+    //node of shadow PR that went down last will come up first
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    //----------------------------------------------------------------------------------------------------
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 3000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 3000 ));
+  }
+  
+  /**
+   * Enable persistence for PR and GatewaySender.
+   * Pause the sender and do some puts in local region.  
+   * Close the local site and rebuild the region and sender from disk store.
+   * Dispatcher should not start dispatching events recovered from persistent sender.
+   * Check if the remote site receives all the events.
+   */
+  @Test
+  public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart2() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+    
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 300 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site -------------------------------------------------
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+  //create senders with disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    
+    //create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+    //start the senders in async mode. This will ensure that the 
+    //node of shadow PR that went down last will come up first
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+    
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+
+
+    LogWriterUtils.getLogWriter().info("Creating the receiver.");
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    //create PR on remote site
+    
+    LogWriterUtils.getLogWriter().info("Creating the partitioned region at receiver. ");
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    createReceiverInVMs(vm2, vm3);
+
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Doing some extra puts. ");
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPutsAfter300( getTestMethodName(), 1000 ));
+    //----------------------------------------------------------------------------------------------------
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
+    
+    LogWriterUtils.getLogWriter().info("Validating the region size at the receiver end. ");
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+  }
+  
+  
+  /**
+   * Enable persistence for PR and GatewaySender.
+   * Pause the sender and do some puts in local region.  
+   * Close the local site and rebuild the region and sender from disk store.
+   * Dispatcher should not start dispatching events recovered from persistent sender.
+   * --> At the same time, do some more puts on the local region.  
+   * Check if the remote site receives all the events.
+   */
+  @Test
+  public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart_Scenario2() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site -------------------------------------------------
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+   //create senders with disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    
+    //create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+    
+    //start the senders in async mode. This will ensure that the 
+    //node of shadow PR that went down last will come up first
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+    
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    //----------------------------------------------------------------------------------------------------
+    
+    //Dispatcher should be dispatching now. Do some more puts through async thread
+    AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+    try {
+      async1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 3000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 3000 ));
+  }
+
+  /**
+   * Test case for bug# 44275.
+   * Enable persistence for PR and GatewaySender. 
+   * Do puts into region with key as a String.
+   * Close the local site and rebuild the region and sender from disk store. 
+   * Check if the remote site receives all the events.
+   */
+  @Test
+  public void testPersistentPRWithPersistentGatewaySender_Restart_Bug44275() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+    
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString( getTestMethodName(), 1000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site -------------------------------------------------
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+  //create senders with disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    
+    //create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+    
+    //start the senders in async mode. This will ensure that the 
+    //node of shadow PR that went down last will come up first
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    //----------------------------------------------------------------------------------------------------
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+    getTestMethodName(), 1000 ));
+  }
+  
+  /**
+   * Test case for bug# 44275.
+   * Enable persistence for PR and GatewaySender. 
+   * Do puts into region with key as a String.
+   * Close the local site and rebuild the region and sender from disk store. 
+   * Check if the remote site receives all the events.
+   */
+  @Test
+  public void testPersistentPRWithPersistentGatewaySender_Restart_DoOps() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString( getTestMethodName(), 1000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site -------------------------------------------------
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+  //create senders with disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    
+    // create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+    
+    //start the senders in async mode. This will ensure that the 
+    //node of shadow PR that went down last will come up first
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    //----------------------------------------------------------------------------------------------------
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+    getTestMethodName(), 1000 ));
+    
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm6.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm7.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    
+   //do some extra puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString( getTestMethodName(), 10000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 10000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+    getTestMethodName(), 10000 ));
+  }
+  
+  @Test
+  public void testPersistentPR_Restart() {
+    // create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+    // create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    // create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    // start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString(
+        getTestMethodName(), 1000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+    // --------------------close and rebuild local site
+    // -------------------------------------------------
+    // kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+    // restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+
+//    // create PR on local site
+//    vm4.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+//        new Object[] { testName, "ln", 1, 100, isOffHeap() });
+//    vm5.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+//        new Object[] { testName, "ln", 1, 100, isOffHeap() });
+//    vm6.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+//        new Object[] { testName, "ln", 1, 100, isOffHeap() });
+//    vm7.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+//        new Object[] { testName, "ln", 1, 100, isOffHeap() });
+
+    // create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1,
+            100, isOffHeap() ));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1,
+            100, isOffHeap() ));
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1,
+            100, isOffHeap() ));
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1,
+            100, isOffHeap() ));
+
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    }
+    catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm6.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+    vm7.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 1000 ));
+  }
+
+  /**
+   * Enable persistence for PR and GatewaySender. 
+   * Close the local site. 
+   * Create the local cache.
+   * Don't create back the partitioned region but create just the sender.  
+   * Check if the remote site receives all the events.
+   * NOTE: This use case is not supported yet. 
+   * For ParallelGatewaySender to start, it must be associated with a partitioned region. 
+   */
+  @Ignore("NotSupported")
+  @Test
+  public void testPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled_Restart2() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site -------------------------------------------------
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+    
+    //create senders from disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    
+    
+    //start the senders. NOTE that the senders are not associated with partitioned region
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Started the senders.");
+    
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    //----------------------------------------------------------------------------------------------------
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+  }
+  
+  /**
+   * Create a non persistent PR and enable persistence for GatewaySender attached to the PR.
+   * Close the local site and rebuild it. Check if the remote site receives all the events.
+   * NOTE: This use case is not supported for now. For persistent parallel gateway sender,
+   * the PR to which it is attached should also be persistent.
+   */
+  @Ignore("NotSupported")
+  @Test
+  public void testNonPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled_Restart() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    
+    //create non persistent PR on local site
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders. The local site has been brought down.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+  //create senders with disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    
+    //create PR on local site
+    vm4.invoke(createPartitionedRegionRunnable());
+    vm5.invoke(createPartitionedRegionRunnable());
+    vm6.invoke(createPartitionedRegionRunnable());
+    vm7.invoke(createPartitionedRegionRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+    
+    //start the senders
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Started the senders.");
+    
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+    getTestMethodName(), 1000 ));
+  }
+
+
+  /**
+   * Create persistent PR and non-persistent GatewaySender.
+   * After doing puts, close the local site. 
+   * Rebuild the PR from persistent disk store and create the sender again. 
+   * Do more puts. Check if the remote site receives newly added events.
+   * 
+   * This test can be used as DUnit test for defect #50247 reported by Indian Railways.
+   * At present, customer is using this configuration and which is not recommended 
+   * since it can lead to event loss of GatewaySender events.
+   */
+  @Ignore("Bug50247")
+  @Test
+  public void testPersistentPartitionedRegionWithGatewaySender_Restart() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), null, 1, 100, isOffHeap() ));
+    
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //wait for senders to become running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    //pause the senders
+    vm4.invoke(pauseSenderRunnable());
+    vm5.invoke(pauseSenderRunnable());
+    vm6.invoke(pauseSenderRunnable());
+    vm7.invoke(pauseSenderRunnable());
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //----------------- Close and rebuild local site -------------------------------------
+    //kill the senders
+    vm4.invoke(killSenderRunnable());
+    vm5.invoke(killSenderRunnable());
+    vm6.invoke(killSenderRunnable());
+    vm7.invoke(killSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+   //create back the senders
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, false, null, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders again");
+    
+    //start the senders
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Started the senders.");
+    
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+
+    //wait for senders running
+    vm4.invoke(waitForSenderRunnable());
+    vm5.invoke(waitForSenderRunnable());
+    vm6.invoke(waitForSenderRunnable());
+    vm7.invoke(waitForSenderRunnable());
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    //create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+      getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+    //-------------------------------------------------------------------------------------------
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 3000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 3000 ));
+  }
+  
+  /**
+   * LocalMaxMemory of user PR is 0 (accessor region).
+   * Parallel sender persistence is enabled. 
+   * In this scenario, the PR for Parallel sender should be created with persistence = false.
+   * This is because since the region is accessor region, it won't host any buckets and 
+   * hence Parallel sender PR is not required to be persistent.
+   * This test is added for defect # 45747. 
+   */
+  @Test
+  public void testParallelPropagationWithSenderPersistenceEnabledForAccessor() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+    
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, true, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, true, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, true, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+      true, 100, 10, false, true, null, true ));
+    
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
+      getTestMethodName() + "_PR", "ln", 1, 100 ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
+      getTestMethodName() + "_PR", "ln", 1, 100 ));
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  /**
+   * Test for bug 50120 see if we can recover after deleting the parallel gateway
+   * sender files and not recoverying the sender when we have a persistent PR.
+   * @throws Throwable 
+   */
+  @Test
+  public void testRemoveGatewayFromPersistentRegionOnRestart() throws Throwable {
+    
+
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      createCacheInVMs(nyPort, vm2, vm3);
+      createReceiverInVMs(vm2, vm3);
+
+      createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+      vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+      vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+      vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+      vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+
+      vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+      vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+      vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+      vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+      startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+      vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+      vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 113 ));
+
+      vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 113 ));
+      vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName(), 113 ));
+
+      //Bounce vm4, vm5, vm6, vm7 without the parallel queue
+      vm4.invoke(WANTestBase.class, "closeCache", new Object [] {});
+      vm5.invoke(WANTestBase.class, "closeCache", new Object [] {});
+      vm6.invoke(WANTestBase.class, "closeCache", new Object [] {});
+      vm7.invoke(WANTestBase.class, "closeCache", new Object [] {});
+
+      vm4.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
+      vm5.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
+      vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
+      vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
+
+      createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+      AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+      AsyncInvocation async5 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+      AsyncInvocation async6 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+      AsyncInvocation async7 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+        getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+      async7.getResult(30 * 1000);
+      async5.getResult(30 * 1000);
+      async6.getResult(30 * 1000);
+      async4.getResult(30 * 1000);
+
+      //This should succeed, because the region recovered even though
+      //the queue was removed.
+      vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName(), 113 ));
+    } finally {
+      vm4.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { false});
+      vm5.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { false});
+      vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { false});
+      vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { false});
+    }
+  }
+  
+  public static void setIgnoreQueue(boolean shouldIgnore) {
+    ColocationHelper.IGNORE_UNRECOVERED_QUEUE = shouldIgnore;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
new file mode 100644
index 0000000..2e84c4e
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
+
+  /**
+   * Normal happy scenario test case.
+   */
+  @Category(FlakyTest.class) // GEODE-1775: fails intermittently
+  @Test
+  public void testParallelPropagationWithClientServer() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    vm2.invoke(() -> WANTestBase.createReceiverAndServer( nyPort ));
+    vm3.invoke(() -> WANTestBase.createReceiverAndServer( nyPort ));
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createClientWithLocator(
+        nyPort, "localhost", getTestMethodName() + "_PR" ));
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      100 ));
+
+    vm5.invoke(() -> WANTestBase.createServer( lnPort ));
+    vm6.invoke(() -> WANTestBase.createServer( lnPort ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+         100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+         100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    vm7.invoke(() -> WANTestBase.createClientWithLocator(
+      lnPort, "localhost", getTestMethodName() + "_PR" ));
+
+    startSenderInVMsAsync("ln", vm5, vm6);
+
+    // before doing any puts, let the senders be running in order to ensure that
+    // not a single event will be lost
+    
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    vm7.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+        10000 ));
+
+    
+    // verify all buckets drained on all sender nodes.
+    vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 10000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 10000 ));
+
+    vm5.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 10000 ));
+    vm6.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 10000 ));
+
+    vm7.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+    
+    vm4.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 10000 ));
+
+  }
+}