You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:55 UTC
[07/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
new file mode 100644
index 0000000..411396b
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class SerialGatewaySenderQueueDUnitTest extends WANTestBase {
+
+ @Test
+ public void testPrimarySecondaryQueueDrainInOrder_RR() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm2.invoke(() -> WANTestBase.createCache(nyPort ));
+ vm3.invoke(() -> WANTestBase.createCache(nyPort ));
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
+
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
+ false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY ));
+ vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
+ false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.addQueueListener( "ln", false));
+ vm5.invoke(() -> WANTestBase.addQueueListener( "ln", false));
+
+ vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
+ vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_RR"));
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+
+ vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+ Wait.pause(5000);
+ HashMap primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+ HashMap secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+ assertEquals(primarySenderUpdates, secondarySenderUpdates);
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+ Wait.pause(2000);
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+ Wait.pause(2000);
+ // We should wait till primarySenderUpdates and secondarySenderUpdates become same
+ // If in 300000ms they don't then throw error.
+ primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+ secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+
+ checkPrimarySenderUpdatesOnVM5(primarySenderUpdates);
+// assertIndexDetailsEquals(primarySenderUpdates, secondarySenderUpdates);
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+ Wait.pause(5000);
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+ HashMap receiverUpdates = (HashMap)vm2.invoke(() -> WANTestBase.checkQueue());
+
+ List destroyList = (List)primarySenderUpdates.get("Destroy");
+ List createList = (List)receiverUpdates.get("Create");
+ for(int i = 0; i< 1000; i++){
+ assertEquals(destroyList.get(i), createList.get(i));
+ }
+ assertEquals(primarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
+
+ Wait.pause(5000);
+ // We expect that after this much time secondary would have got batch removal message
+ // removing all the keys.
+ secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+ assertEquals(secondarySenderUpdates.get("Destroy"), receiverUpdates.get("Create"));
+ }
+
+ protected void checkPrimarySenderUpdatesOnVM5(HashMap primarySenderUpdates) {
+ vm5.invoke(() -> WANTestBase.checkQueueOnSecondary( primarySenderUpdates ));
+ }
+
+ @Test
+ public void testPrimarySecondaryQueueDrainInOrder_PR() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+ vm2.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
+ vm3.invoke(() -> WANTestBase.addListenerOnRegion(getTestMethodName() + "_PR"));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
+ false, 100, 10, false, false, null, true, 1, OrderPolicy.KEY ));
+ vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
+ false, 100, 10, false, false, null, true,1, OrderPolicy.KEY ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.addQueueListener( "ln", false));
+ vm5.invoke(() -> WANTestBase.addQueueListener( "ln", false));
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+
+ vm6.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 1000 ));
+ Wait.pause(5000);
+ HashMap primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+ HashMap secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+ checkPrimarySenderUpdatesOnVM5(primarySenderUpdates);
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+ Wait.pause(4000);
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln"));
+ Wait.pause(15000);
+ primarySenderUpdates = (HashMap)vm4.invoke(() -> WANTestBase.checkQueue());
+ secondarySenderUpdates = (HashMap)vm5.invoke(() -> WANTestBase.checkQueue());
+ assertEquals(primarySenderUpdates, secondarySenderUpdates);
+
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln"));
+ Wait.pause(5000);
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ /**
+ * Test to validate that serial gateway sender queue diskSynchronous attribute
+ * when persistence of sender is enabled.
+ */
+ @Test
+ public void test_ValidateSerialGatewaySenderQueueAttributes_1() {
+ Integer localLocPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+ Integer remoteLocPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort ));
+
+ WANTestBase test = new WANTestBase(getTestMethodName());
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost["
+ + localLocPort + "]");
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+
+ File directory = new File("TKSender" + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ directory.mkdir();
+ File[] dirs1 = new File[] { directory };
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ dsf.setDiskDirs(dirs1);
+ DiskStore diskStore = dsf.create("FORNY");
+
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setBatchConflationEnabled(true);
+ fact.setBatchSize(200);
+ fact.setBatchTimeInterval(300);
+ fact.setPersistenceEnabled(true);// enable the persistence
+ fact.setDiskSynchronous(true);
+ fact.setDiskStoreName("FORNY");
+ fact.setMaximumQueueMemory(200);
+ fact.setAlertThreshold(1200);
+ GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
+ fact.addGatewayEventFilter(myEventFilter1);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+ GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+ fact.addGatewayTransportFilter(myStreamFilter2);
+ final IgnoredException exTKSender = IgnoredException.addIgnoredException("Could not connect");
+ try {
+ GatewaySender sender1 = fact.create("TKSender", 2);
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.addGatewaySenderId(sender1.getId());
+ factory.setDataPolicy(DataPolicy.PARTITION);
+ Region region = cache.createRegionFactory(factory.create()).create(
+ "test_ValidateGatewaySenderAttributes");
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ assertEquals(senders.size(), 1);
+ GatewaySender gatewaySender = senders.iterator().next();
+ Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender)
+ .getQueues();
+ assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS);
+ RegionQueue regionQueue = regionQueues.iterator().next();
+ assertEquals(true, regionQueue.getRegion().getAttributes()
+ .isDiskSynchronous());
+ } finally {
+ exTKSender.remove();
+ }
+ }
+
+ /**
+ * Test to validate that serial gateway sender queue diskSynchronous attribute
+ * when persistence of sender is not enabled.
+ */
+ @Test
+ public void test_ValidateSerialGatewaySenderQueueAttributes_2() {
+ Integer localLocPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+ Integer remoteLocPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, localLocPort ));
+
+ WANTestBase test = new WANTestBase(getTestMethodName());
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + localLocPort + "]");
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setBatchConflationEnabled(true);
+ fact.setBatchSize(200);
+ fact.setBatchTimeInterval(300);
+ fact.setPersistenceEnabled(false);//set persistence to false
+ fact.setDiskSynchronous(true);
+ fact.setMaximumQueueMemory(200);
+ fact.setAlertThreshold(1200);
+ GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
+ fact.addGatewayEventFilter(myEventFilter1);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+ GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+ fact.addGatewayTransportFilter(myStreamFilter2);
+ final IgnoredException exp = IgnoredException.addIgnoredException("Could not connect");
+ try {
+ GatewaySender sender1 = fact.create("TKSender", 2);
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.addGatewaySenderId(sender1.getId());
+ factory.setDataPolicy(DataPolicy.PARTITION);
+ Region region = cache.createRegionFactory(factory.create()).create(
+ "test_ValidateGatewaySenderAttributes");
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ assertEquals(senders.size(), 1);
+ GatewaySender gatewaySender = senders.iterator().next();
+ Set<RegionQueue> regionQueues = ((AbstractGatewaySender) gatewaySender)
+ .getQueues();
+ assertEquals(regionQueues.size(), GatewaySender.DEFAULT_DISPATCHER_THREADS);
+ RegionQueue regionQueue = regionQueues.iterator().next();
+
+ assertEquals(false, regionQueue.getRegion().getAttributes()
+ .isDiskSynchronous());
+ } finally {
+ exp.remove();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..7664ab4
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.serial;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+/**
+ *
+ */
+@Category(DistributedTest.class)
+public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
+ WANTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ public SerialWANPersistenceEnabledGatewaySenderDUnitTest() {
+ super();
+ }
+
+ /**
+ * Just enable the persistence for GatewaySender and see if it remote site
+ * receives all the events.
+ */
+ @Test
+ public void testReplicatedRegionWithGatewaySenderPersistenceEnabled() {
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, true, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, true, null, true ));
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+ }
+
+ /**
+ * Enable persistence for the Region and see if the remote site gets all the
+ * events.
+ */
+ @Test
+ public void testPersistentReplicatedRegionWithGatewaySender() {
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, false, null, true ));
+
+ vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+ }
+
+ /**
+ * Enable persistence for region as well as GatewaySender and see if remote
+ * site receives all the events.
+ *
+ */
+ @Test
+ public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, true, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, true, null, true ));
+
+ vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion( getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+ }
+
+ /**
+ * Enable persistence for GatewaySender, kill the sender and restart it. Check
+ * if the remote site receives all the event.
+ */
+ @Test
+ public void testReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, null, true ));
+ String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore);
+ LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore);
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ // verify if the queue has all the events
+ // vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000
+ // ));
+ // vm5.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000
+ // ));
+ //
+ // vm2.invoke(() -> WANTestBase.validateRegionSize(
+ // testName + "_RR", 0 ));
+ // vm3.invoke(() -> WANTestBase.validateRegionSize(
+ // testName + "_RR", 0 ));
+
+ // kill the vm
+ vm4.invoke(() -> WANTestBase.killSender());
+ vm5.invoke(() -> WANTestBase.killSender());
+ vm6.invoke(() -> WANTestBase.killSender());
+ vm7.invoke(() -> WANTestBase.killSender());
+
+ LogWriterUtils.getLogWriter().info("Killed all the sender. ");
+ // restart the vm
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore(
+ "ln", 2, false, 100, 10, false, true, null,
+ firstDStore, true ));
+ LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 ");
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore(
+ "ln", 2, false, 100, 10, false, true, null,
+ secondDStore, true ));
+ LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 ");
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+ LogWriterUtils.getLogWriter().info("Started the sender in vm 4");
+
+ vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+ LogWriterUtils.getLogWriter().info("Started the sender in vm 5");
+ try {
+ inv1.join();
+ } catch (InterruptedException e) {
+ fail("Got interrupted exception while waiting for startSender to finish.");
+ }
+
+ Wait.pause(5000);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+ }
+
+ /**
+ * Enable persistence for Region and persistence for GatewaySender. Kill the
+ * vm with regions and bring that up again. Check if the remote site receives
+ * all the event. again?
+ *
+ */
+ @Test
+ public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, null, true ));
+ String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore);
+ LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore);
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ // kill the vm
+ vm4.invoke(() -> WANTestBase.killSender());
+ vm5.invoke(() -> WANTestBase.killSender());
+
+ LogWriterUtils.getLogWriter().info("Killed the sender. ");
+ // restart the vm
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, firstDStore, true ));
+ LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 ");
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, secondDStore, true ));
+ LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 ");
+
+ vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+ LogWriterUtils.getLogWriter().info("Started the sender in vm 4");
+
+ vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+ LogWriterUtils.getLogWriter().info("Started the sender in vm 5");
+ try {
+ inv1.join();
+ } catch (InterruptedException e) {
+ fail("Got interrupted exception while waiting for startSender to finish.");
+ }
+
+ Wait.pause(5000);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+ }
+
+ /**
+ * Enable persistence for Region. No persistence for GatewaySender. Kill the
+ * vm with regions and bring that up again. Check if the remote site receives
+ * all the event. again?
+ *
+ */
+ @Test
+ public void testPersistentReplicatedRegionWithGatewaySender_Restart() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false,
+ 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2, false,
+ 100, 10, false, false, null, true ));
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ // verify if the queue has all the events
+ // vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000
+ // ));
+ // vm5.invoke(() -> WANTestBase.checkQueueSize( "ln", 1000
+ // ));
+ //
+ // vm2.invoke(() -> WANTestBase.validateRegionSize(
+ // testName + "_RR", 0 ));
+ // vm3.invoke(() -> WANTestBase.validateRegionSize(
+ // testName + "_RR", 0 ));
+
+ // kill the vm
+ vm4.invoke(() -> WANTestBase.killSender());
+ vm5.invoke(() -> WANTestBase.killSender());
+
+ LogWriterUtils.getLogWriter().info("Killed the sender. ");
+ // restart the vm
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm4.invoke(() -> WANTestBase.createSender(
+ "ln", 2, false, 100, 10, false, false, null, true));
+ LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 ");
+ vm5.invoke(() -> WANTestBase.createSender(
+ "ln", 2, false, 100, 10, false, false, null, true));
+ LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 ");
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+ LogWriterUtils.getLogWriter().info("Started the sender in vm 4");
+
+ vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+ LogWriterUtils.getLogWriter().info("Started the sender in vm 5");
+
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ try {
+ inv1.join();
+ } catch (InterruptedException e) {
+ fail("Got interrupted exception while waiting for startSender to finish.");
+ }
+
+ Wait.pause(5000);
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+ }
+
+
+ /**
+ * Enable persistence for Region and persistence for GatewaySender. Kill the
+ * vm with regions and bring that up again. Check if the remote site receives
+ * all the event. again?
+ * In this case put is continuously happening while the vm is down.
+ */
+ @Test
+ public void testPersistentReplicatedRegionWithGatewaySenderPersistenceEnabled_Restart2() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ String firstDStore = (String)vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, null, true ));
+ String secondDStore = (String)vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The first ds is " + firstDStore);
+ LogWriterUtils.getLogWriter().info("The first ds is " + secondDStore);
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ // kill the vm
+ vm4.invoke(() -> WANTestBase.killSender());
+ vm5.invoke(() -> WANTestBase.killSender());
+
+ LogWriterUtils.getLogWriter().info("Killed the sender. ");
+ // restart the vm
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, firstDStore, true ));
+ LogWriterUtils.getLogWriter().info("Created the sender.... in vm4 ");
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, false,
+ 100, 10, false, true, null, secondDStore, true ));
+ LogWriterUtils.getLogWriter().info("Created the sender.... in vm5 ");
+
+ vm4.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm5.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSender( "ln" ));
+ LogWriterUtils.getLogWriter().info("Started the sender in vm 4");
+
+ vm5.invoke(() -> WANTestBase.startSender( "ln" ));
+ LogWriterUtils.getLogWriter().info("Started the sender in vm 5");
+ try {
+ inv1.join();
+ } catch (InterruptedException e) {
+ fail("Got interrupted exception while waiting for startSender to finish.");
+ }
+
+ Wait.pause(5000);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+ }
+}