You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:02 UTC
[14/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
new file mode 100644
index 0000000..3ae9a01
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
@@ -0,0 +1,1063 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+@Category(DistributedTest.class)
+public class ReplicatedRegion_ParallelWANPropagationDUnitTest extends WANTestBase{
+
+ public ReplicatedRegion_ParallelWANPropagationDUnitTest() {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+ final String expectedExceptions = null;
+
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+
+ @Test
+ public void test_DR_PGS_1Nodes_Put_Receiver() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ createCacheInVMs(lnPort, vm4);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
+ fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region");
+ }
+ catch (Exception e) {
+ if (!e.getCause().getMessage()
+ .contains("can not be used with replicated region")) {
+ fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region");
+ }
+ }
+ }
+
+ /*1. Validate that parallelGatewaySenderId can be added to distributed region
+ *Region distributed ack/noack + PGS
+ *1. Find out the restrictions on totalNumBuckets on shadowPR
+ *2. Find out the restrictions on redundancy on shadowPR
+ *3. Find out the restrictions on localMaxMemory on shadowPR
+ *4. Find out the best way user will specify PR attributes to PGS
+ *5. Find out the restrictions on ordering.
+ *6. put on region populates the queue
+ *7. put on region reaches to remote site. Dispatcher works as expected
+ *8. m1 and m2 has DR(ack/noack). put on DR from m1 populates queue on both m1 and m2. Validate that remote site got all the events
+ *9. m1 and m2 has DR(ack/noack). create/put/destroy/operations populates the queue. Validate that remote site got correct events
+ *10. m1 and m2 has DR(ack/noack). localDestroy is called on m1's DR. This locally destroys M1's shadowPr
+ *11. m1 and m2 has DR(ack/noack). destroy is called on m1's DR. This destroys entire shadowPr on m1 and m2
+ *12. m1 and m2 has DR(ack/noack). close Region is called on m1's DR. This locally destroys shadowPr on m1
+ *13. m1 and m2 has DR(ack/noack). cache.close on m1'. This locally destroys shadowPr on m1
+ *14. Validate HA scenario does not cause any event loss
+ *15. PDX events of DR are propagated to remote sites
+ *16. validate stats
+ *17: PR and DR regions with same name.. Can this be created. If yes then how to differentiate these 2 different shadowPR.
+ *18. test for redundancy. FOR SPR's redundancy will be equal to the number of nodes where DR is present. Max is 3. I know this needs to be figure it out at runtime.
+ *19. test without providing diskStoreName..I suspect some problem with this code. diskStoreName=null looks like this is not handled very well. need to verify
+ *20. ParallelGatewaySenderQueue#addPR method has multiple check for inPersistenceEnabled. Can's we do it with only one check.
+ */
+
+ /**
+ * Test to validate that created parallel gatewaySenders id can be added to
+ * distributed region
+ * Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0
+ */
+ @Ignore
+ @Test
+ public void test_PGS_Started_DR_CREATED_NO_RECEIVER() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class
+ .getName(), vm4);
+ ExpectedException exp2 = addExpectedException(InterruptedException.class
+ .getName(), vm4);
+ try {
+*/ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, false ));
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+ vm4.invoke(() -> WANTestBase.doPuts(
+ getTestMethodName() + "_RR", 1000 ));
+ vm4.invoke(() -> WANTestBase.validateQueueContents(
+ "ln1", 1000 ));
+
+/* }
+ finally {
+ exp1.remove();
+ exp2.remove();
+ }
+*/ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**
+ * Test to validate that distributed region with given parallelGatewaySender id
+ * is created first and then a same parallelGatewaySender is created
+ * a single put in DR is enqueued in parallelQueue
+ * Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0
+ */
+ @Ignore
+ @Test
+ public void test_DR_CREATED_PGS_STARTED_NO_RECEIVER() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class
+ .getName(), vm4);
+ ExpectedException exp2 = addExpectedException(InterruptedException.class
+ .getName(), vm4);
+ try {*/
+ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, false ));
+ vm4.invoke(() -> WANTestBase.doPuts(
+ getTestMethodName() + "_RR", 1000 ));
+ vm4.invoke(() -> WANTestBase.validateQueueContents(
+ "ln1", 1000 ));
+/* }
+ finally {
+ exp1.remove();
+ exp2.remove();
+ }
+*/ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_1Node_Put_ValidateQueue_No_Receiver() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+
+/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class
+ .getName(), vm4);
+ ExpectedException exp2 = addExpectedException(InterruptedException.class
+ .getName(), vm4);
+ try {*/
+ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true ));
+ vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts(
+ getTestMethodName() + "_RR", 10000 ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 10000 ));
+ vm4.invoke(() -> WANTestBase.validateQueueContents(
+ "ln1", 10000 ));
+/* }
+ finally {
+ exp1.remove();
+ exp2.remove();
+ }
+ */
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_2Nodes_Put_ValidateQueue_No_Receiver() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+
+/* ExpectedException exp1 = addExpectedException(
+ GatewaySenderException.class.getName());
+ ExpectedException exp2 = addExpectedException(
+ InterruptedException.class.getName());
+ ExpectedException exp3 = addExpectedException(
+ CacheClosedException.class.getName());
+ try {
+*/ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true ));
+
+ startSenderInVMs("ln1", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts(
+ getTestMethodName() + "_RR", 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents(
+ "ln1", 1000 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents(
+ "ln1", 1000 ));
+
+/* }
+ finally {
+ exp1.remove();
+ exp2.remove();
+ exp3.remove();
+ }
+*/
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+// public void test_DR_PGS_ORDERPOLICY_PARTITION_EXPECTException(){
+//
+// }
+// public void test_DR_PGS_DISKSTORE_NAME_PROVIDED_VALIDATE_DISK(){
+//
+// }
+// public void test_DR_PGS_DISKSTORE_NAME_NOT_PROVIDED_VALIDATE_DISK(){
+//
+// }
+//
+// public void test_DR_PGS_START_STOP_START(){
+//
+// }
+//
+// public void test_DR_PGS_PERSISTENCE_START_STOP_START(){
+//
+// }
+//
+// public void test_DR_PGS_START_PAUSE_STOP(){
+//
+// }
+//
+// public void test_DR_PGS_START_PAUSE_RESUME_VALIDATE_RECEIVER(){
+//
+// }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_1Nodes_Put_Receiver_2() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ createCacheInVMs(lnPort, vm4);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln1"));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000));
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_2Nodes_Put_Receiver() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true));
+ vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true));
+
+ startSenderInVMs("ln1", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_2Nodes_EMPTY_Put_Receiver() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm2.invoke(() -> WANTestBase.createCache( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.EMPTY, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true));
+ vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true));
+
+ startSenderInVMs("ln1", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+// vm4.invoke(() -> WANTestBase.validateRegionSize( testName + "_RR",
+// 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PR_PGS_4Nodes_Put_Receiver_2Nodes() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 10, 100, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 10, 100, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 10, 100, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 10, 100, false, false, null, true ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+ vm5.invoke(() -> WANTestBase.doNextPuts( getTestMethodName() + "_PR",
+ 1000, 2000 ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_PR",
+ 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
+ 0 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents( "ln",
+ 0 ));
+
+/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
+ .getName());
+ try {*/
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 1000 ));
+/* }
+ finally {
+ exp1.remove();
+ }
+*/ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_NOMANUALSTART_4Nodes_Put_ValidateReceiver() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, false ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, false ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, false ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, false ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+ vm6.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+ vm7.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+ vm6.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+ vm7.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+
+/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
+ .getName());
+ try {*/
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+/* }
+ finally {
+ exp1.remove();
+ }*/
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_4Nodes_Put_CLOSE4NODESCACHE_RECREATE_PUT_ValidateReceiver()
+ throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ // before doing any puts, let the senders be running in order to ensure
+ // that
+ // not a single event will be lost
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+
+/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
+ .getName());
+ try {*/
+ vm4.invoke(() -> WANTestBase.killSender());
+ vm5.invoke(() -> WANTestBase.killSender());
+ vm6.invoke(() -> WANTestBase.killSender());
+ vm7.invoke(() -> WANTestBase.killSender());
+/* }
+ finally {
+ exp1.remove();
+ }*/
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ // ------------------------------------------------------------------------------------
+
+ vm4.invoke(() -> WANTestBase.doNextPuts(
+ getTestMethodName() + "_RR", 1000, 2000 ));
+
+ // verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+ vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+/* exp1 = addExpectedException(CacheClosedException.class.getName());
+ try {*/
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 2000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 2000 ));
+/* }
+ finally {
+ exp1.remove();
+ }*/
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_NO_ACK_PGS_2Nodes_Put_ValidateQueue_Receiver() throws Exception {
+ try {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2);
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1",
+ Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1",
+ Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true));
+ vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 100, false, false, null, true));
+
+ startSenderInVMs("ln1", vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+ vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+ 1000 ));
+
+ vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+ vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+ 0 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ }
+ catch (Exception e) {
+ Assert.fail("Unexpected exception", e);
+ }
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_2NODES_1NODESDOWN_Validate_Receiver() throws Exception {
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5);
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ Thread.sleep(60000);;
+
+/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
+ .getName());
+ try {*/
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0(
+ getTestMethodName() + "_RR", 1000 ));
+ Wait.pause(1000);
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.killSender());
+ try {
+ inv1.join();
+ inv2.join();
+ }
+ catch (Exception e) {
+ Assert.fail("UnExpected Exception", e);
+ }
+/* }
+ finally {
+ exp1.remove();
+ }*/
+
+ Integer size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
+ LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size);
+
+
+ vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+ size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
+ LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size);
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 1000 ));
+ }
+
+ /**Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0*/
+ @Ignore
+ @Test
+ public void test_DR_PGS_4NODES_2NODESDOWN_Validate_Receiver() throws Exception {
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+ 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ Thread.sleep(60000);
+/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
+ .getName());
+ try */{
+ AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0(
+ getTestMethodName() + "_RR", 10000 ));
+ Thread.sleep(1000);
+ AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+ Thread.sleep(2000);
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts1(
+ getTestMethodName() + "_RR", 10000 ));
+ Thread.sleep(1500);
+ AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ }
+ catch (Exception e) {
+ Assert.fail("UnExpected Exception", e);
+ }
+ }/*
+ finally {
+ exp1.remove();
+ }*/
+
+ vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+ vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 10000 ));
+
+ }
+
+ public static void doPuts0(String regionName, int numPuts) {
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
+ .getName());
+ try {
+
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ for (long i = 0; i < numPuts; i++) {
+ LogWriterUtils.getLogWriter().info("Put : key : " + i);
+ r.put(i, "0_" + i);
+ }
+ } finally {
+ exp.remove();
+ exp1.remove();
+ }
+ }
+
+ public static void doPuts1(String regionName, int numPuts){
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
+ .getName());
+ try {
+
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ for (long i = 0; i < numPuts; i++) {
+ LogWriterUtils.getLogWriter().info("Put : key : " + i);
+ r.put(i, "1_" + i);
+ }
+ } finally {
+ exp.remove();
+ exp1.remove();
+ }
+ }
+
+ public static void doPuts2(String regionName, int numPuts){
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
+ .getName());
+ try {
+ Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ for (long i = 0; i < numPuts; i++) {
+ LogWriterUtils.getLogWriter().info("Put : key : " + i);
+ r.put(i, "2_" + i);
+ }
+ } finally {
+ exp.remove();
+ exp1.remove();
+ }
+ }
+
+ /**
+ * Test to validate that put on DR with no ack on multiple nodes are propagated to parallelQueue on multiple nodes
+ */
+
+ /**
+ * Test to validate that the single put in DR is propagated to remote site through parallelGatewaySender
+ */
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
new file mode 100644
index 0000000..0ee78c5
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class SenderWithTransportFilterDUnitTest extends WANTestBase {
+
+ @Test
+ public void testSerialSenderWithTransportFilter() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+
+ vm3.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, false, 100,
+ 1, false, false, true ));
+
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+ vm3.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 100 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_RR", 100 ));
+ }
+
+ @Test
+ public void testParallelSenderWithTransportFilter() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort ));
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", null, 0, 10, isOffHeap() ));
+
+ vm3.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, true, 100,
+ 1, false, false, true ));
+
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR", "ln", 0, 10, isOffHeap() ));
+
+ vm3.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR", 100 ));
+ }
+
+ public static int createReceiverWithTransportFilters(int locPort) {
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + locPort
+ + "]");
+
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ fact.setStartPort(port);
+ fact.setEndPort(port);
+ ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
+ transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
+ if (!transportFilters.isEmpty()) {
+ for (GatewayTransportFilter filter : transportFilters) {
+ fact.addGatewayTransportFilter(filter);
+ }
+ }
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port, e);
+ }
+ return port;
+ }
+
+ public static void createSenderWithTransportFilter(String dsName,
+ int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize,
+ boolean isConflation, boolean isPersistent, boolean isManualStart) {
+ File persistentDirectory = new File(dsName + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File[] dirs1 = new File[] { persistentDirectory };
+
+ if (isParallel) {
+ GatewaySenderFactory gateway = cache
+ .createGatewaySenderFactory();
+ gateway.setParallel(true);
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+ ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
+ transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
+ if (!transportFilters.isEmpty()) {
+ for (GatewayTransportFilter filter : transportFilters) {
+ gateway.addGatewayTransportFilter(filter);
+ }
+ }
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+ .getName());
+ }
+ else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.create(dsName, remoteDsId);
+
+ }
+ else {
+ GatewaySenderFactory gateway = cache
+ .createGatewaySenderFactory();
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManualStart);
+ ((InternalGatewaySenderFactory)gateway)
+ .setLocatorDiscoveryCallback(new MyLocatorCallback());
+ ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
+ transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
+ if (!transportFilters.isEmpty()) {
+ for (GatewayTransportFilter filter : transportFilters) {
+ gateway.addGatewayTransportFilter(filter);
+ }
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+ .getName());
+ }
+ else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.create(dsName, remoteDsId);
+ }
+ }
+
+ static class CheckSumTransportFilter implements GatewayTransportFilter {
+
+ Adler32 checker = new Adler32();
+
+ private String name;
+
+ public CheckSumTransportFilter(String name){
+ this.name = name;
+ }
+
+ @Override
+ public String toString(){
+ return this.name;
+ }
+
+ @Override
+ public InputStream getInputStream(InputStream stream) {
+ return new CheckedInputStream(stream, checker);
+ }
+
+ @Override
+ public OutputStream getOutputStream(OutputStream stream) {
+ return new CheckedOutputStream(stream, checker);
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..6e2581e
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.admin.AdminDistributedSystemFactory;
+import com.gemstone.gemfire.admin.AdminException;
+import com.gemstone.gemfire.admin.DistributedSystemConfig;
+import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.cache.CacheObserverAdapter;
+import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase {
+
+ private static final long MAX_WAIT = 70000;
+
+ private static final int NUM_KEYS = 1000;
+
+ public ShutdownAllPersistentGatewaySenderDUnitTest() {
+ super();
+ }
+
+ @Override
+ protected final void postSetUpWANTestBase() throws Exception {
+ IgnoredException.addIgnoredException("Cache is being closed by ShutdownAll");
+ }
+
+ private static final long serialVersionUID = 1L;
+
+ @Test
+ public void testGatewaySender() throws Exception {
+ IgnoredException.addIgnoredException("Cache is shutting down");
+
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ vm2.invoke(() -> WANTestBase.createCache( nyPort ));
+ vm3.invoke(() -> WANTestBase.createCache( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 400, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+ // set the CacheObserver to block the ShutdownAll
+ SerializableRunnable waitAtShutdownAll = new SerializableRunnable() {
+ @Override
+ public void run() {
+ LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
+ CacheObserverHolder.setInstance(new CacheObserverAdapter() {
+ @Override
+ public void beforeShutdownAll() {
+ final Region region = cache.getRegion(getTestMethodName() + "_PR");
+ Wait.waitForCriterion(new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return region.size() >= 2;
+ }
+
+ @Override
+ public String description() {
+ return "Wait for wan to have processed several events";
+ }
+ }, 30000, 100, true);
+ }
+ });
+ }
+ };
+ vm2.invoke(waitAtShutdownAll);
+ vm3.invoke(waitAtShutdownAll);
+
+ AsyncInvocation vm4_future = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS ));
+
+ // ShutdownAll will be suspended at observer, so puts will continue
+ AsyncInvocation future = shutDownAllMembers(vm2, 2, MAX_WAIT);
+ future.join(MAX_WAIT);
+
+ // now restart vm1 with gatewayHub
+ LogWriterUtils.getLogWriter().info("restart in VM2");
+ vm2.invoke(() -> WANTestBase.createCache( nyPort ));
+ vm3.invoke(() -> WANTestBase.createCache( nyPort ));
+ AsyncInvocation vm3_future = vm3.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR",
+ "ln", 1, 100, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+ vm3_future.join(MAX_WAIT);
+
+ vm3.invoke(new SerializableRunnable() {
+ public void run() {
+ final Region region = cache.getRegion(getTestMethodName() + "_PR");
+ cache.getLogger().info(
+ "vm1's region size before restart gatewayHub is " + region.size());
+ }
+ });
+ vm2.invoke(() -> WANTestBase.createReceiver());
+
+ // wait for vm0 to finish its work
+ vm4_future.join(MAX_WAIT);
+ vm4.invoke(new SerializableRunnable() {
+ public void run() {
+ Region region = cache.getRegion(getTestMethodName() + "_PR");
+ assertEquals(NUM_KEYS, region.size());
+ }
+ });
+
+ // verify the other side (vm1)'s entries received from gateway
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ final Region region = cache.getRegion(getTestMethodName() + "_PR");
+
+ cache.getLogger().info(
+ "vm1's region size after restart gatewayHub is " + region.size());
+ Wait.waitForCriterion(new WaitCriterion() {
+ public boolean done() {
+ Object lastValue = region.get(NUM_KEYS - 1);
+ if (lastValue != null && lastValue.equals(NUM_KEYS - 1)) {
+ region.getCache().getLogger().info(
+ "Last key has arrived, its value is " + lastValue
+ + ", end of wait.");
+ return true;
+ }
+ else
+ return (region.size() == NUM_KEYS);
+ }
+
+ public String description() {
+ return "Waiting for destination region to reach size: " + NUM_KEYS
+ + ", current is " + region.size();
+ }
+ }, MAX_WAIT, 100, true);
+ assertEquals(NUM_KEYS, region.size());
+ }
+ });
+
+ }
+
+ private AsyncInvocation shutDownAllMembers(VM vm, final int expectedNumber, final long timeout) {
+ AsyncInvocation future = vm.invokeAsync(new SerializableRunnable("Shutdown all the members") {
+
+ public void run() {
+ DistributedSystemConfig config;
+ AdminDistributedSystemImpl adminDS = null;
+ try {
+ config = AdminDistributedSystemFactory.defineDistributedSystem(cache
+ .getDistributedSystem(), "");
+ adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory
+ .getDistributedSystem(config);
+ adminDS.connect();
+ Set members = adminDS.shutDownAllMembers(timeout);
+ int num = members == null ? 0 : members.size();
+ assertEquals(expectedNumber, num);
+ }
+ catch (AdminException e) {
+ throw new RuntimeException(e);
+ }
+ finally {
+ if (adminDS != null) {
+ adminDS.disconnect();
+ }
+ }
+ }
+ });
+ return future;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
new file mode 100644
index 0000000..b504f87
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.*;
+import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverException;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
+import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class WANConfigurationJUnitTest {
+
+ private Cache cache;
+
+ /**
+ * Test to validate that the sender can not be started without configuring
+ * locator
+ * @throws IOException
+ *
+ * @throws IOException
+ */
+ @Test
+ public void test_GatewaySender_without_Locator() throws IOException {
+ try {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setParallel(true);
+ GatewaySender sender1 = fact.create("NYSender", 2);
+ sender1.start();
+ fail("Expected IllegalStateException but not thrown");
+ }
+ catch (Exception e) {
+ if ((e instanceof IllegalStateException && e
+ .getMessage()
+ .startsWith(
+ LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+ .toLocalizedString()))) {
+ }
+ else {
+ fail("Expected IllegalStateException but received :" + e);
+ }
+ }
+ }
+
+ /**
+ * Test to validate that sender with same Id can not be added to cache.
+ */
+ @Test
+ public void test_SameGatewaySenderCreatedTwice() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ try {
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setParallel(true);
+ fact.setManualStart(true);
+ fact.create("NYSender", 2);
+ fact.create("NYSender", 2);
+ fail("Expected IllegalStateException but not thrown");
+ }
+ catch (Exception e) {
+ if (e instanceof IllegalStateException
+ && e.getMessage().contains("A GatewaySender with id")) {
+
+ }
+ else {
+ fail("Expected IllegalStateException but received :" + e);
+ }
+ }
+ }
+
+ /**
+ * Test to validate that same gatewaySender Id can not be added to the region attributes.
+ */
+ @Test
+ public void test_SameGatewaySenderIdAddedTwice() {
+ try {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setParallel(true);
+ fact.setManualStart(true);
+ GatewaySender sender1 = fact.create("NYSender", 2);
+ AttributesFactory factory = new AttributesFactory();
+ factory.addGatewaySenderId(sender1.getId());
+ factory.addGatewaySenderId(sender1.getId());
+ fail("Expected IllegalArgumentException but not thrown");
+ }
+ catch (Exception e) {
+ if (e instanceof IllegalArgumentException
+ && e.getMessage().contains("is already added")) {
+
+ }
+ else {
+ fail("Expected IllegalStateException but received :" + e);
+ }
+ }
+ }
+
+ @Test
+ public void test_GatewaySenderIdAndAsyncEventId() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ AttributesFactory factory = new AttributesFactory();
+ factory.addGatewaySenderId("ln");
+ factory.addGatewaySenderId("ny");
+ factory.addAsyncEventQueueId("Async_LN");
+ RegionAttributes attrs = factory.create();
+
+ Set<String> senderIds = new HashSet<String>();
+ senderIds.add("ln");
+ senderIds.add("ny");
+ Set<String> attrsSenderIds = attrs.getGatewaySenderIds();
+ assertEquals(senderIds, attrsSenderIds);
+ Region r = cache.createRegion("Customer", attrs);
+ assertEquals(senderIds, ((LocalRegion)r).getGatewaySenderIds());
+ }
+
+ /**
+ * Test to validate that distributed region can not have the gateway sender
+ * with parallel distribution policy
+ *
+ */
+ @Ignore("Bug51491")
+ @Test
+ public void test_GatewaySender_Parallel_DistributedRegion() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setParallel(true);
+ fact.setManualStart(true);
+ GatewaySender sender1 = fact.create("NYSender", 2);
+ AttributesFactory factory = new AttributesFactory();
+ factory.addGatewaySenderId(sender1.getId());
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ try {
+ RegionFactory regionFactory = cache.createRegionFactory(factory.create());
+ Region region = regionFactory
+ .create("test_GatewaySender_Parallel_DistributedRegion");
+ }
+ catch (Exception e) {
+ fail("Unexpected Exception :" + e);
+ }
+ }
+
+ @Test
+ public void test_GatewaySender_Parallel_MultipleDispatcherThread() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setParallel(true);
+ fact.setManualStart(true);
+ fact.setDispatcherThreads(4);
+ try {
+ GatewaySender sender1 = fact.create("NYSender", 2);
+ }
+ catch (GatewaySenderException e) {
+ fail("UnExpected Exception " + e);
+ }
+ }
+
+ @Test
+ public void test_GatewaySender_Serial_ZERO_DispatcherThread() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setManualStart(true);
+ fact.setDispatcherThreads(0);
+ try {
+ GatewaySender sender1 = fact.create("NYSender", 2);
+ fail("Expected GatewaySenderException but not thrown");
+ }
+ catch (GatewaySenderException e) {
+ if (e.getMessage().contains("can not be created with dispatcher threads less than 1")) {
+ }
+ else {
+ fail("Expected IllegalStateException but received :" + e);
+ }
+ }
+ }
+
+ /**
+ * Test to validate the gateway receiver attributes are correctly set
+ */
+ @Test
+ public void test_ValidateGatewayReceiverAttributes() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+ int port1 = randomAvailableTCPPorts[0];
+ int port2 = randomAvailableTCPPorts[1];
+
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ if(port1 < port2){
+ fact.setStartPort(port1);
+ fact.setEndPort(port2);
+ }else{
+ fact.setStartPort(port2);
+ fact.setEndPort(port1);
+ }
+
+ fact.setMaximumTimeBetweenPings(2000);
+ fact.setSocketBufferSize(200);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+ fact.addGatewayTransportFilter(myStreamFilter2);
+ fact.addGatewayTransportFilter(myStreamFilter1);
+ GatewayReceiver receiver1 = fact.create();
+
+
+ Region region = cache.createRegionFactory().create(
+ "test_ValidateGatewayReceiverAttributes");
+ Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+ GatewayReceiver rec = receivers.iterator().next();
+ assertEquals(receiver1.getHost(), rec.getHost());
+ assertEquals(receiver1.getStartPort(), rec.getStartPort());
+ assertEquals(receiver1.getEndPort(), rec.getEndPort());
+ assertEquals(receiver1.getBindAddress(), rec.getBindAddress());
+ assertEquals(receiver1.getMaximumTimeBetweenPings(), rec
+ .getMaximumTimeBetweenPings());
+ assertEquals(receiver1.getSocketBufferSize(), rec
+ .getSocketBufferSize());
+ assertEquals(receiver1.getGatewayTransportFilters().size(), rec
+ .getGatewayTransportFilters().size());
+
+ }
+
+ @Test
+ public void test_ValidateGatewayReceiverStatus() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+ int port1 = randomAvailableTCPPorts[0];
+ int port2 = randomAvailableTCPPorts[1];
+
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ if(port1 < port2){
+ fact.setStartPort(port1);
+ fact.setEndPort(port2);
+ }else{
+ fact.setStartPort(port2);
+ fact.setEndPort(port1);
+ }
+
+ fact.setMaximumTimeBetweenPings(2000);
+ fact.setSocketBufferSize(200);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+ fact.addGatewayTransportFilter(myStreamFilter2);
+ fact.addGatewayTransportFilter(myStreamFilter1);
+ GatewayReceiver receiver1 = fact.create();
+ assertTrue(receiver1.isRunning());
+ }
+
+ /**
+ * Test to validate that serial gateway sender attributes are correctly set
+ */
+ @Test
+ public void test_ValidateSerialGatewaySenderAttributes() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setManualStart(true);
+ fact.setBatchConflationEnabled(true);
+ fact.setBatchSize(200);
+ fact.setBatchTimeInterval(300);
+ fact.setPersistenceEnabled(false);
+ fact.setDiskStoreName("FORNY");
+ fact.setMaximumQueueMemory(200);
+ fact.setAlertThreshold(1200);
+ GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
+ fact.addGatewayEventFilter(myEventFilter1);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+ GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+ fact.addGatewayTransportFilter(myStreamFilter2);
+ GatewaySender sender1 = fact.create("TKSender", 2);
+
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.addGatewaySenderId(sender1.getId());
+ factory.setDataPolicy(DataPolicy.PARTITION);
+ Region region = cache.createRegionFactory(factory.create()).create(
+ "test_ValidateGatewaySenderAttributes");
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ assertEquals(senders.size(), 1);
+ GatewaySender gatewaySender = senders.iterator().next();
+ assertEquals(sender1.getRemoteDSId(), gatewaySender
+ .getRemoteDSId());
+ assertEquals(sender1.isManualStart(), gatewaySender.isManualStart());
+ assertEquals(sender1.isBatchConflationEnabled(), gatewaySender
+ .isBatchConflationEnabled());
+ assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
+ assertEquals(sender1.getBatchTimeInterval(), gatewaySender
+ .getBatchTimeInterval());
+ assertEquals(sender1.isPersistenceEnabled(), gatewaySender
+ .isPersistenceEnabled());
+ assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
+ assertEquals(sender1.getMaximumQueueMemory(), gatewaySender
+ .getMaximumQueueMemory());
+ assertEquals(sender1.getAlertThreshold(), gatewaySender
+ .getAlertThreshold());
+ assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender
+ .getGatewayEventFilters().size());
+ assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender
+ .getGatewayTransportFilters().size());
+
+ }
+
+ /**
+ * Test to validate that parallel gateway sender attributes are correctly set
+ */
+ @Test
+ public void test_ValidateParallelGatewaySenderAttributes() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ fact.setParallel(true);
+ fact.setManualStart(true);
+ fact.setBatchConflationEnabled(true);
+ fact.setBatchSize(200);
+ fact.setBatchTimeInterval(300);
+ fact.setPersistenceEnabled(false);
+ fact.setDiskStoreName("FORNY");
+ fact.setMaximumQueueMemory(200);
+ fact.setAlertThreshold(1200);
+ GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
+ fact.addGatewayEventFilter(myEventFilter1);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+ GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+ fact.addGatewayTransportFilter(myStreamFilter2);
+ GatewaySender sender1 = fact.create("TKSender", 2);
+
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.addGatewaySenderId(sender1.getId());
+ factory.setDataPolicy(DataPolicy.PARTITION);
+ Region region = cache.createRegionFactory(factory.create()).create(
+ "test_ValidateGatewaySenderAttributes");
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ assertEquals(1, senders.size());
+ GatewaySender gatewaySender = senders.iterator().next();
+ assertEquals(sender1.getRemoteDSId(), gatewaySender
+ .getRemoteDSId());
+ assertEquals(sender1.isManualStart(), gatewaySender.isManualStart());
+ assertEquals(sender1.isBatchConflationEnabled(), gatewaySender
+ .isBatchConflationEnabled());
+ assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
+ assertEquals(sender1.getBatchTimeInterval(), gatewaySender
+ .getBatchTimeInterval());
+ assertEquals(sender1.isPersistenceEnabled(), gatewaySender
+ .isPersistenceEnabled());
+ assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
+ assertEquals(sender1.getMaximumQueueMemory(), gatewaySender
+ .getMaximumQueueMemory());
+ assertEquals(sender1.getAlertThreshold(), gatewaySender
+ .getAlertThreshold());
+ assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender
+ .getGatewayEventFilters().size());
+ assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender
+ .getGatewayTransportFilters().size());
+
+ }
+
+ @Test
+ public void test_GatewaySenderWithGatewaySenderEventListener1() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ InternalGatewaySenderFactory fact = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory();
+ AsyncEventListener listener = new MyGatewaySenderEventListener();
+ ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener);
+ try {
+ fact.create("ln", 2);
+ fail("Expected GatewaySenderException. When a sender is added , remoteDSId should not be provided.");
+ } catch (Exception e) {
+ if (e instanceof GatewaySenderException
+ && e.getMessage()
+ .contains(
+ "cannot define a remote site because at least AsyncEventListener is already added.")) {
+
+ } else {
+ fail("Expected GatewaySenderException but received :" + e);
+ }
+ }
+ }
+
+ @Test
+ public void test_GatewaySenderWithGatewaySenderEventListener2() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+ AsyncEventListener listener = new MyGatewaySenderEventListener();
+ ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener);
+ try {
+ ((InternalGatewaySenderFactory)fact).create("ln");
+ } catch (Exception e) {
+ fail("Received Exception :" + e);
+ }
+ }
+
+ @Test
+ public void test_ValidateGatewayReceiverAttributes_2() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ fact.setStartPort(50504);
+ fact.setMaximumTimeBetweenPings(1000);
+ fact.setSocketBufferSize(4000);
+ fact.setEndPort(70707);
+ fact.setManualStart(true);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ fail("The test failed with IOException");
+ }
+
+ assertEquals(50504, receiver.getStartPort());
+ assertEquals(1000, receiver.getMaximumTimeBetweenPings());
+ assertEquals(4000,receiver.getSocketBufferSize());
+ assertEquals(70707, receiver.getEndPort());
+ }
+
+ /**
+ * This test takes a minimum of 120s to execute. It is known to hang on Mac OS
+ * X Yosemite do to changes in the the message string checked in
+ * GatewayReceiverImpl around line 167. Expects
+ * "Cannot assign requested address" but gets
+ * "Can't assign requested address". Timeout after 150s to safeguard against
+ * hanging on other platforms that may differ.
+ */
+ @Test(timeout = 150000)
+ public void test_ValidateGatewayReceiverAttributes_WrongBindAddress() {
+ if (System.getProperty("os.name").equals("Mac OS X")) {
+ fail("Failing to avoid known hang on Mac OS X.");
+ }
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ fact.setStartPort(50504);
+ fact.setMaximumTimeBetweenPings(1000);
+ fact.setSocketBufferSize(4000);
+ fact.setEndPort(70707);
+ fact.setManualStart(true);
+ fact.setBindAddress("200.112.204.10");
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ fail("Expected GatewayReceiverException");
+ }
+ catch (GatewayReceiverException gRE){
+ assertTrue(gRE.getMessage().contains("Failed to create server socket on"));
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ fail("The test failed with IOException");
+ }
+ }
+
+ @Test
+ public void test_ValidateGatewayReceiverDefaultStartPortAndDefaultEndPort() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ fact.setMaximumTimeBetweenPings(1000);
+ fact.setSocketBufferSize(4000);
+ fact.setManualStart(true);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ fail("The test failed with IOException");
+ }
+ int port = receiver.getPort();
+ if((port < 5000) || (port > 5500)) {
+ fail("GatewayReceiver started on out of range port");
+ }
+ }
+
+ @Test
+ public void test_ValidateGatewayReceiverDefaultStartPortAndEndPortProvided() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ fact.setMaximumTimeBetweenPings(1000);
+ fact.setSocketBufferSize(4000);
+ fact.setEndPort(50707);
+ fact.setManualStart(true);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ fail("The test failed with IOException");
+ }
+ int port = receiver.getPort();
+ if((port < GatewayReceiver.DEFAULT_START_PORT) || (port > 50707)) {
+ fail("GatewayReceiver started on out of range port");
+ }
+ }
+
+ @Test
+ public void test_ValidateGatewayReceiverWithManualStartFALSE() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ fact.setMaximumTimeBetweenPings(1000);
+ fact.setSocketBufferSize(4000);
+ fact.setStartPort(5303);
+ fact.setManualStart(false);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+ GatewayReceiver receiver = fact.create();
+ int port = receiver.getPort();
+ if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) {
+ fail("GatewayReceiver started on out of range port");
+ }
+ }
+
+ @Test
+ public void test_ValidateGatewayReceiverWithStartPortAndDefaultEndPort() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ fact.setMaximumTimeBetweenPings(1000);
+ fact.setSocketBufferSize(4000);
+ fact.setStartPort(5303);
+ fact.setManualStart(true);
+ GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+ fact.addGatewayTransportFilter(myStreamFilter1);
+
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ fail("The test failed with IOException");
+ }
+ int port = receiver.getPort();
+ if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) {
+ fail("GatewayReceiver started on out of range port");
+ }
+ }
+
+ @Test
+ public void test_ValidateGatewayReceiverWithWrongEndPortProvided() {
+ cache = new CacheFactory().set(MCAST_PORT, "0").create();
+ try {
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ fact.setMaximumTimeBetweenPings(1000);
+ fact.setSocketBufferSize(4000);
+ fact.setEndPort(4999);
+ GatewayReceiver receiver = fact.create();
+ fail("wrong end port set in the GatewayReceiver");
+ } catch (IllegalStateException expected) {
+ if(!expected.getMessage().contains("Please specify either start port a value which is less than end port.")){
+ fail("Caught IllegalStateException");
+ expected.printStackTrace();
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (this.cache != null) {
+ this.cache.close();
+ }
+ }
+}