You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:43:58 UTC
[10/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..359a56d
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -0,0 +1,1593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.io.IOException;
+
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.ColocationHelper;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+
+@Category(DistributedTest.class)
+public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends WANTestBase {
+
+ private static final long serialVersionUID = 1L;
+
+ public ParallelWANPersistenceEnabledGatewaySenderDUnitTest() {
+ super();
+ }
+
+ @Override
+ protected final void postSetUpWANTestBase() throws Exception {
+ //The restart tests log this string
+ IgnoredException.addIgnoredException("failed accepting client connection");
+ }
+
+ @Test
+ public void testPartitionedRegionWithGatewaySenderPersistenceEnabled() throws IOException {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ createCache(lnPort);
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setPersistenceEnabled(true);
+ fact.setParallel(true);
+ final IgnoredException ex = IgnoredException.addIgnoredException("Could not connect");
+ try {
+ GatewaySender sender1 = fact.create("NYSender", 2);
+
+ AttributesFactory rFact = new AttributesFactory();
+ rFact.addGatewaySenderId(sender1.getId());
+
+ PartitionAttributesFactory pFact = new PartitionAttributesFactory();
+ pFact.setTotalNumBuckets(100);
+ pFact.setRedundantCopies(1);
+ rFact.setPartitionAttributes(pFact.create());
+ Region r = cache.createRegionFactory(rFact.create()).create("MyRegion");
+ sender1.start();
+ } finally {
+ ex.remove();
+ }
+
+ }
+ catch (Exception e) {
+ fail("Unexpected Exception :" + e);
+ }
+ }
+
+ /**
+ * Enable persistence for region as well as GatewaySender and see if remote
+ * site receives all the events.
+ */
+ @Test
+ public void testPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+
+ }
+
+ /**
+ * Enable persistence for the GatewaySender but not the region
+ */
+ @Test
+ public void testPartitionedRegionWithPersistentGatewaySender() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ LogWriterUtils.getLogWriter().info("Created remote receivers");
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created local site cache");
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+
+ LogWriterUtils.getLogWriter().info("Created local site senders");
+
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ LogWriterUtils.getLogWriter().info("Created local site persistent PR");
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Started the senders");
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ }
+
+ protected SerializableRunnableIF createPartitionedRegionRunnable() {
+ return () -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() );
+ }
+
+
+
+
+ /**
+ * Enable persistence for GatewaySender.
+ * Pause the sender and do some puts in local region.
+ * Close the local site and rebuild the region and sender from disk store.
+ * Dispatcher should not start dispatching events recovered from persistent sender.
+ * Check if the remote site receives all the events.
+ */
+ @Test
+ public void testPRWithGatewaySenderPersistenceEnabled_Restart() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //--------------------close and rebuild local site -------------------------------------------------
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create senders with disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+ //create PR on local site
+ AsyncInvocation inv1 = vm4.invokeAsync(createPartitionedRegionRunnable());
+ AsyncInvocation inv2 = vm5.invokeAsync(createPartitionedRegionRunnable());
+ AsyncInvocation inv3 = vm6.invokeAsync(createPartitionedRegionRunnable());
+ AsyncInvocation inv4 = vm7.invokeAsync(createPartitionedRegionRunnable());
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //start the senders in async mode. This will ensure that the
+ //node of shadow PR that went down last will come up first
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ //----------------------------------------------------------------------------------------------------
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 3000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 3000 ));
+ }
+
+ protected SerializableRunnableIF killSenderRunnable() {
+ return () -> WANTestBase.killSender();
+ }
+
+ protected SerializableRunnableIF pauseSenderRunnable() {
+ return () -> WANTestBase.pauseSender( "ln" );
+ }
+
+ protected SerializableRunnableIF waitForSenderRunnable() {
+ return () -> WANTestBase.waitForSenderRunningState( "ln" );
+ }
+
+ /**
+ * Enable persistence for PR and GatewaySender.
+ * Pause the sender and do some puts in local region.
+ * Close the local site and rebuild the region and sender from disk store.
+ * Dispatcher should not start dispatching events recovered from persistent sender.
+ * Check if the remote site receives all the events.
+ */
+ @Test
+ public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //--------------------close and rebuild local site -------------------------------------------------
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create senders with disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+ //create PR on local site
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //start the senders in async mode. This will ensure that the
+ //node of shadow PR that went down last will come up first
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ //----------------------------------------------------------------------------------------------------
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 3000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 3000 ));
+ }
+
+ /**
+ * Enable persistence for PR and GatewaySender.
+ * Pause the sender and do some puts in local region.
+ * Close the local site and rebuild the region and sender from disk store.
+ * Dispatcher should not start dispatching events recovered from persistent sender.
+ * Check if the remote site receives all the events.
+ */
+ @Test
+ public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart2() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, false ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 300 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //--------------------close and rebuild local site -------------------------------------------------
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create senders with disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+
+ //create PR on local site
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //start the senders in async mode. This will ensure that the
+ //node of shadow PR that went down last will come up first
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+
+ LogWriterUtils.getLogWriter().info("Creating the receiver.");
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ //create PR on remote site
+
+ LogWriterUtils.getLogWriter().info("Creating the partitioned region at receiver. ");
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ createReceiverInVMs(vm2, vm3);
+
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Doing some extra puts. ");
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPutsAfter300( getTestMethodName(), 1000 ));
+ //----------------------------------------------------------------------------------------------------
+ vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
+ vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
+
+ LogWriterUtils.getLogWriter().info("Validating the region size at the receiver end. ");
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ }
+
+
+ /**
+ * Enable persistence for PR and GatewaySender.
+ * Pause the sender and do some puts in local region.
+ * Close the local site and rebuild the region and sender from disk store.
+ * Dispatcher should not start dispatching events recovered from persistent sender.
+ * --> At the same time, do some more puts on the local region.
+ * Check if the remote site receives all the events.
+ */
+ @Test
+ public void testPersistentPRWithGatewaySenderPersistenceEnabled_Restart_Scenario2() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //--------------------close and rebuild local site -------------------------------------------------
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create senders with disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+
+ //create PR on local site
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //start the senders in async mode. This will ensure that the
+ //node of shadow PR that went down last will come up first
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ //----------------------------------------------------------------------------------------------------
+
+ //Dispatcher should be dispatching now. Do some more puts through async thread
+ AsyncInvocation async1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+ try {
+ async1.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 3000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 3000 ));
+ }
+
+ /**
+ * Test case for bug# 44275.
+ * Enable persistence for PR and GatewaySender.
+ * Do puts into region with key as a String.
+ * Close the local site and rebuild the region and sender from disk store.
+ * Check if the remote site receives all the events.
+ */
+ @Test
+ public void testPersistentPRWithPersistentGatewaySender_Restart_Bug44275() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString( getTestMethodName(), 1000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //--------------------close and rebuild local site -------------------------------------------------
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create senders with disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+
+ //create PR on local site
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //start the senders in async mode. This will ensure that the
+ //node of shadow PR that went down last will come up first
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ //----------------------------------------------------------------------------------------------------
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ }
+
+ /**
+ * Test case for bug# 44275.
+ * Enable persistence for PR and GatewaySender.
+ * Do puts into region with key as a String.
+ * Close the local site and rebuild the region and sender from disk store.
+ * Check if the remote site receives all the events.
+ */
+ @Test
+ public void testPersistentPRWithPersistentGatewaySender_Restart_DoOps() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString( getTestMethodName(), 1000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //--------------------close and rebuild local site -------------------------------------------------
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create senders with disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+
+ // create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //start the senders in async mode. This will ensure that the
+ //node of shadow PR that went down last will come up first
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ //----------------------------------------------------------------------------------------------------
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm6.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm7.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+
+ //do some extra puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString( getTestMethodName(), 10000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 10000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 10000 ));
+ }
+
+ @Test
+ public void testPersistentPR_Restart() {
+ // create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+ // create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ // create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ // start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPutsWithKeyAsString(
+ getTestMethodName(), 1000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ // --------------------close and rebuild local site
+ // -------------------------------------------------
+ // kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ // restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+// // create PR on local site
+// vm4.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+// new Object[] { testName, "ln", 1, 100, isOffHeap() });
+// vm5.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+// new Object[] { testName, "ln", 1, 100, isOffHeap() });
+// vm6.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+// new Object[] { testName, "ln", 1, 100, isOffHeap() });
+// vm7.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+// new Object[] { testName, "ln", 1, 100, isOffHeap() });
+
+ // create PR on local site
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1,
+ 100, isOffHeap() ));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1,
+ 100, isOffHeap() ));
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1,
+ 100, isOffHeap() ));
+ AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName(), "ln", 1,
+ 100, isOffHeap() ));
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm6.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm7.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ }
+
+ /**
+ * Enable persistence for PR and GatewaySender.
+ * Close the local site.
+ * Create the local cache.
+ * Don't create back the partitioned region but create just the sender.
+ * Check if the remote site receives all the events.
+ * NOTE: This use case is not supported yet.
+ * For ParallelGatewaySender to start, it must be associated with a partitioned region.
+ */
+ @Ignore("NotSupported")
+ @Test
+ public void testPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled_Restart2() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //--------------------close and rebuild local site -------------------------------------------------
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+
+ //create senders from disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+
+
+ //start the senders. NOTE that the senders are not associated with partitioned region
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Started the senders.");
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ //----------------------------------------------------------------------------------------------------
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ }
+
+ /**
+ * Create a non persistent PR and enable persistence for GatewaySender attached to the PR.
+ * Close the local site and rebuild it. Check if the remote site receives all the events.
+ * NOTE: This use case is not supported for now. For persistent parallel gateway sender,
+ * the PR to which it is attached should also be persistent.
+ */
+ @Ignore("NotSupported")
+ @Test
+ public void testNonPersistentPartitionedRegionWithGatewaySenderPersistenceEnabled_Restart() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //create non persistent PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders. The local site has been brought down.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create senders with disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+
+ //create PR on local site
+ vm4.invoke(createPartitionedRegionRunnable());
+ vm5.invoke(createPartitionedRegionRunnable());
+ vm6.invoke(createPartitionedRegionRunnable());
+ vm7.invoke(createPartitionedRegionRunnable());
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //start the senders
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Started the senders.");
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 1000 ));
+ }
+
+
+ /**
+ * Create persistent PR and non-persistent GatewaySender.
+ * After doing puts, close the local site.
+ * Rebuild the PR from persistent disk store and create the sender again.
+ * Do more puts. Check if the remote site receives newly added events.
+ *
+ * This test can be used as DUnit test for defect #50247 reported by Indian Railways.
+ * At present, customer is using this configuration and which is not recommended
+ * since it can lead to event loss of GatewaySender events.
+ */
+ @Ignore("Bug50247")
+ @Test
+ public void testPersistentPartitionedRegionWithGatewaySender_Restart() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ //pause the senders
+ vm4.invoke(pauseSenderRunnable());
+ vm5.invoke(pauseSenderRunnable());
+ vm6.invoke(pauseSenderRunnable());
+ vm7.invoke(pauseSenderRunnable());
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //----------------- Close and rebuild local site -------------------------------------
+ //kill the senders
+ vm4.invoke(killSenderRunnable());
+ vm5.invoke(killSenderRunnable());
+ vm6.invoke(killSenderRunnable());
+ vm7.invoke(killSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create back the senders
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders again");
+
+ //start the senders
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Started the senders.");
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+
+ //wait for senders running
+ vm4.invoke(waitForSenderRunnable());
+ vm5.invoke(waitForSenderRunnable());
+ vm6.invoke(waitForSenderRunnable());
+ vm7.invoke(waitForSenderRunnable());
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ //create PR on local site
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //-------------------------------------------------------------------------------------------
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 3000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 3000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 3000 ));
+ }
+
+ /**
+ * LocalMaxMemory of user PR is 0 (accessor region).
+ * Parallel sender persistence is enabled.
+ * In this scenario, the PR for Parallel sender should be created with persistence = false.
+ * This is because since the region is accessor region, it won't host any buckets and
+ * hence Parallel sender PR is not required to be persistent.
+ * This test is added for defect # 45747.
+ */
+ @Test
+ public void testParallelPropagationWithSenderPersistenceEnabledForAccessor() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
+ getTestMethodName() + "_PR", "ln", 1, 100 ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegionAsAccessor(
+ getTestMethodName() + "_PR", "ln", 1, 100 ));
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ }
+
+ /**
+ * Test for bug 50120 see if we can recover after deleting the parallel gateway
+ * sender files and not recoverying the sender when we have a persistent PR.
+ * @throws Throwable
+ */
+ @Test
+ public void testRemoveGatewayFromPersistentRegionOnRestart() throws Throwable {
+
+
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 113 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 113 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 113 ));
+
+ //Bounce vm4, vm5, vm6, vm7 without the parallel queue
+ vm4.invoke(WANTestBase.class, "closeCache", new Object [] {});
+ vm5.invoke(WANTestBase.class, "closeCache", new Object [] {});
+ vm6.invoke(WANTestBase.class, "closeCache", new Object [] {});
+ vm7.invoke(WANTestBase.class, "closeCache", new Object [] {});
+
+ vm4.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
+ vm5.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
+ vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
+ vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { true});
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ AsyncInvocation async4 = vm4.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ AsyncInvocation async5 = vm5.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ AsyncInvocation async6 = vm6.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+ AsyncInvocation async7 = vm7.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion(
+ getTestMethodName(), null, 1, 100, isOffHeap() ));
+
+ async7.getResult(30 * 1000);
+ async5.getResult(30 * 1000);
+ async6.getResult(30 * 1000);
+ async4.getResult(30 * 1000);
+
+ //This should succeed, because the region recovered even though
+ //the queue was removed.
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName(), 113 ));
+ } finally {
+ vm4.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { false});
+ vm5.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { false});
+ vm6.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { false});
+ vm7.invoke(ParallelWANPersistenceEnabledGatewaySenderDUnitTest.class, "setIgnoreQueue" , new Object[] { false});
+ }
+ }
+
+ public static void setIgnoreQueue(boolean shouldIgnore) {
+ ColocationHelper.IGNORE_UNRECOVERED_QUEUE = shouldIgnore;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
new file mode 100644
index 0000000..2e84c4e
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import com.gemstone.gemfire.test.junit.categories.FlakyTest;
+
+@Category(DistributedTest.class)
+public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
+
+ /**
+ * Normal happy scenario test case.
+ */
+ @Category(FlakyTest.class) // GEODE-1775: fails intermittently
+ @Test
+ public void testParallelPropagationWithClientServer() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm2.invoke(() -> WANTestBase.createReceiverAndServer( nyPort ));
+ vm3.invoke(() -> WANTestBase.createReceiverAndServer( nyPort ));
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createClientWithLocator(
+ nyPort, "localhost", getTestMethodName() + "_PR" ));
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 100 ));
+
+ vm5.invoke(() -> WANTestBase.createServer( lnPort ));
+ vm6.invoke(() -> WANTestBase.createServer( lnPort ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+ vm7.invoke(() -> WANTestBase.createClientWithLocator(
+ lnPort, "localhost", getTestMethodName() + "_PR" ));
+
+ startSenderInVMsAsync("ln", vm5, vm6);
+
+ // before doing any puts, let the senders be running in order to ensure that
+ // not a single event will be lost
+
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ vm7.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
+ 10000 ));
+
+
+ // verify all buckets drained on all sender nodes.
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+ vm6.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+
+ vm7.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 10000 ));
+
+ }
+}