You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:55 UTC

[07/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
new file mode 100644
index 0000000..411396b
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
+
+  @Test
+  public void testPrimarySecondaryQueueDrainInOrder_RR() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    vm2.invoke(() -> WANTestBase.createCache(nyPort ));
+    vm3.invoke(() -> WANTestBase.createCache(nyPort ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    vm2.invoke(() -> WANTestBase.createReceiver());
+    vm3.invoke(() -> WANTestBase.createReceiver());
+    
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+
+    vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
+        false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
+        false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    
+    vm4.invoke(() -> WANTestBase.addQueueListener( "ln", false));
+    vm5.invoke(() -> WANTestBase.addQueueListener( "ln", false));
+    
+    vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
+    vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
+
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+    
+    vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+      1000 ));
+    Wait.pause(5000);
+    HashMap primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+    HashMap secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+    assertEquals(primarySenderUpdates, secondarySenderUpdates);
+    
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+    Wait.pause(2000);
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+    Wait.pause(2000);
+    // We should wait till primarySenderUpdates and secondarySenderUpdates become same
+    // If in 300000ms they don't then throw error.
+    primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+    secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+    
+    checkPrimarySenderUpdatesOnVM5(primarySenderUpdates);
+//    assertIndexDetailsEquals(primarySenderUpdates, secondarySenderUpdates);
+    
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+    Wait.pause(5000);
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_RR", 1000 ));
+    primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+    HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkQueue());
+    
+    List destroyList = (List)primarySenderUpdates.get("Destroy");
+    List createList = (List)receiverUpdates.get("Create");
+    for(int i = 0; i< 1000; i++){
+      assertEquals(destroyList.get(i), createList.get(i));
+    }
+    assertEquals(primarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
+    
+    Wait.pause(5000);
+    // We expect that after this much time secondary would have got batch removal message
+    // removing all the keys.
+    secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+    assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
+  }
+
+  protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) {
+    vm5.invoke(() -> WANTestBase.checkQueueOnSecondary( primarySenderUpdates ));
+  }
+  
+  @Test
+  public void testPrimarySecondaryQueueDrainInOrder_PR() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+  
+    vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
+    vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
+        false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY ));
+    vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
+        false, 100, 10, false, false, null, true,1, OrderPolicy.KEY ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.addQueueListener( "ln", false));
+    vm5.invoke(() -> WANTestBase.addQueueListener( "ln", false));
+
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+    
+    vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+      1000 ));
+    Wait.pause(5000);
+    HashMap primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+    HashMap secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+    checkPrimarySenderUpdatesOnVM5(primarySenderUpdates);
+    
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+    Wait.pause(4000);
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+    Wait.pause(15000);
+    primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+    secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+    assertEquals(primarySenderUpdates, secondarySenderUpdates);
+    
+    vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+    Wait.pause(5000);
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName() + "_PR", 1000 ));
+  }
+  
+  /**
+   * Test to validate that serial gateway sender queue diskSynchronous attribute
+   * when persistence of sender is enabled. 
+   */
+  @Test
+  public void test_ValidateSerialGatewaySenderQueueAttributes_1() {
+    Integer localLocPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+    Integer remoteLocPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort ));
+
+    WANTestBase test = new WANTestBase(getTestMethodName());
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost["
+        + localLocPort + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+
+    File directory = new File("TKSender" + "_disk_"
+        + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    directory.mkdir();
+    File[] dirs1 = new File[] { directory };
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    dsf.setDiskDirs(dirs1);
+    DiskStore diskStore = dsf.create("FORNY");
+
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setBatchConflationEnabled(true);
+    fact.setBatchSize(200);
+    fact.setBatchTimeInterval(300);
+    fact.setPersistenceEnabled(true);// enable the persistence
+    fact.setDiskSynchronous(true);
+    fact.setDiskStoreName("FORNY");
+    fact.setMaximumQueueMemory(200);
+    fact.setAlertThreshold(1200);
+    GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
+    fact.addGatewayEventFilter(myEventFilter1);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+    fact.addGatewayTransportFilter(myStreamFilter2);
+    final IgnoredException exTKSender = IgnoredException.addIgnoredException("Could not connect");
+    try {
+      GatewaySender sender1 = fact.create("TKSender", 2);
+
+      AttributesFactory factory = new AttributesFactory();
+      factory.addGatewaySenderId(sender1.getId());
+      factory.setDataPolicy(DataPolicy.PARTITION);
+      Region region = cache.createRegionFactory(factory.create()).create(
+          "test_ValidateGatewaySenderAttributes");
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      assertEquals(senders.size(), 1);
+      GatewaySender gatewaySender = senders.iterator().next();
+      Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender)
+          .getQueues();
+      assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS);
+      RegionQueue regionQueue = regionQueues.iterator().next();
+      assertEquals(true, regionQueue.getRegion().getAttributes()
+          .isDiskSynchronous());
+    } finally {
+      exTKSender.remove();
+    }
+  }
+  
+  /**
+   * Test to validate that serial gateway sender queue diskSynchronous attribute
+   * when persistence of sender is not enabled. 
+   */
+  @Test
+  public void test_ValidateSerialGatewaySenderQueueAttributes_2() {
+    Integer localLocPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    
+    Integer remoteLocPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort ));
+    
+    WANTestBase test = new WANTestBase(getTestMethodName());
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + localLocPort + "]");
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);  
+
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setBatchConflationEnabled(true);
+    fact.setBatchSize(200);
+    fact.setBatchTimeInterval(300);
+    fact.setPersistenceEnabled(false);//set persistence to false
+    fact.setDiskSynchronous(true);
+    fact.setMaximumQueueMemory(200);
+    fact.setAlertThreshold(1200);
+    GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
+    fact.addGatewayEventFilter(myEventFilter1);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+    fact.addGatewayTransportFilter(myStreamFilter2);
+    final IgnoredException exp = IgnoredException.addIgnoredException("Could not connect");
+    try {
+      GatewaySender sender1 = fact.create("TKSender", 2);
+
+      AttributesFactory factory = new AttributesFactory();
+      factory.addGatewaySenderId(sender1.getId());
+      factory.setDataPolicy(DataPolicy.PARTITION);
+      Region region = cache.createRegionFactory(factory.create()).create(
+          "test_ValidateGatewaySenderAttributes");
+      Set<GatewaySender> senders = cache.getGatewaySenders();
+      assertEquals(senders.size(), 1);
+      GatewaySender gatewaySender = senders.iterator().next();
+      Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender)
+          .getQueues();
+      assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS);
+      RegionQueue regionQueue = regionQueues.iterator().next();
+
+      assertEquals(false, regionQueue.getRegion().getAttributes()
+          .isDiskSynchronous());
+    } finally {
+      exp.remove();
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..7664ab4
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+/**
+ * 
+ */
+@Category(DistributedTest.class)
+public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
+    WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public SerialWANPersistenceEnabledGatewaySenderDUnitTest() {
+    super();
+  }
+
+  /**
+   * Just enable the persistence for GatewaySender and see if it remote site
+   * receives all the events.
+   */
+  @Test
+  public void testReplicatedRegionWithGatewaySenderPersistenceEnabled() {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+
+  }
+
+  /**
+   * Enable persistence for the Region and see if the remote site gets all the
+   * events.
+   */
+  @Test
+  public void testPersistentReplicatedRegionWithGatewaySender() {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+
+  }
+
+  /**
+   * Enable persistence for region as well as GatewaySender and see if remote
+   * site receives all the events.
+   * 
+   */
+  @Test
+  public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, true, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+
+  }
+
+  /**
+   * Enable persistence for GatewaySender, kill the sender and restart it. Check
+   * if the remote site receives all the event.
+   */
+  @Test
+  public void testReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+            100, 10, false, true, null, null, true ));
+    String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+            100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore);
+    LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore);
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+    // verify if the queue has all the events
+    // vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000
+    // ));
+    // vm5.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000
+    // ));
+    //
+    // vm2.invoke(() -> WANTestBase.validateRegionSize(
+    // testName + "_RR", 0 ));
+    // vm3.invoke(() -> WANTestBase.validateRegionSize(
+    // testName + "_RR", 0 ));
+
+    // kill the vm
+    vm4.invoke(() -> WANTestBase.killSender());
+    vm5.invoke(() -> WANTestBase.killSender());
+    vm6.invoke(() -> WANTestBase.killSender());
+    vm7.invoke(() -> WANTestBase.killSender());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the sender. ");
+    // restart the vm
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore(
+        "ln", 2, false, 100, 10, false, true, null,
+        firstDStore, true ));
+    LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 ");
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore(
+        "ln", 2, false, 100, 10, false, true, null,
+        secondDStore, true ));
+    LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 ");
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Started the sender in vm 4");
+
+    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Started the sender in vm 5");
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      fail("Got interrupted exception while waiting for startSender to finish.");
+    }
+
+    Wait.pause(5000);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+
+  }
+
+  /**
+   * Enable persistence for Region and persistence for GatewaySender. Kill the
+   * vm with regions and bring that up again. Check if the remote site receives
+   * all the event. again?
+   * 
+   */
+  @Test
+  public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+            100, 10, false, true, null, null, true ));
+    String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+            100, 10, false, true, null, null, true  ));
+
+    LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore);
+    LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore);
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+    // kill the vm
+    vm4.invoke(() -> WANTestBase.killSender());
+    vm5.invoke(() -> WANTestBase.killSender());
+
+    LogWriterUtils.getLogWriter().info("Killed the sender. ");
+    // restart the vm
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+      100, 10, false, true, null, firstDStore, true  ));
+    LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 ");
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+            100, 10, false, true, null, secondDStore, true  ));
+    LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 ");
+    
+    vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+  
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Started the sender in vm 4");
+
+    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Started the sender in vm 5");
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      fail("Got interrupted exception while waiting for startSender to finish.");
+    }
+
+    Wait.pause(5000);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+
+  }
+  
+  /**
+   * Enable persistence for Region. No persistence for GatewaySender. Kill the
+   * vm with regions and bring that up again. Check if the remote site receives
+   * all the event. again?
+   * 
+   */
+  @Test
+  public void testPersistentReplicatedRegionWithGatewaySender_Restart() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false,
+            100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2, false,
+            100, 10, false, false, null, true ));
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+    // verify if the queue has all the events
+    // vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000
+    // ));
+    // vm5.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000
+    // ));
+    //
+    // vm2.invoke(() -> WANTestBase.validateRegionSize(
+    // testName + "_RR", 0 ));
+    // vm3.invoke(() -> WANTestBase.validateRegionSize(
+    // testName + "_RR", 0 ));
+
+    // kill the vm
+    vm4.invoke(() -> WANTestBase.killSender());
+    vm5.invoke(() -> WANTestBase.killSender());
+
+    LogWriterUtils.getLogWriter().info("Killed the sender. ");
+    // restart the vm
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(() -> WANTestBase.createSender(
+        "ln", 2, false, 100, 10, false, false, null, true));
+    LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 ");
+    vm5.invoke(() -> WANTestBase.createSender(
+        "ln", 2, false, 100, 10, false, false, null, true));
+    LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 ");
+    
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Started the sender in vm 4");
+
+    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Started the sender in vm 5");
+    
+    AsyncInvocation inv1 =  vm4.invokeAsync(() -> WANTestBase.createPersistentReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      fail("Got interrupted exception while waiting for startSender to finish.");
+    }
+
+    Wait.pause(5000);
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+      1000 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+
+  }
+  
+  
+  /**
+   * Enable persistence for Region and persistence for GatewaySender. Kill the
+   * vm with regions and bring that up again. Check if the remote site receives
+   * all the event. again?
+   * In this case put is continuously happening while the vm is down.
+   */
+  @Test
+  public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart2() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+            100, 10, false, true, null, null, true  ));
+    String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+            100, 10, false, true, null, null, true  ));
+
+    LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore);
+    LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore);
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+    // kill the vm
+    vm4.invoke(() -> WANTestBase.killSender());
+    vm5.invoke(() -> WANTestBase.killSender());
+
+    LogWriterUtils.getLogWriter().info("Killed the sender. ");
+    // restart the vm
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+      100, 10, false, true, null, firstDStore, true  ));
+    LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 ");
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+            100, 10, false, true, null, secondDStore, true  ));
+    LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 ");
+    
+    vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+    
+    vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap() ));
+  
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Started the sender in vm 4");
+
+    vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+    LogWriterUtils.getLogWriter().info("Started the sender in vm 5");
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      fail("Got interrupted exception while waiting for startSender to finish.");
+    }
+
+    Wait.pause(5000);
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+
+  }
+}