You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:02 UTC

[14/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
new file mode 100644
index 0000000..3ae9a01
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
@@ -0,0 +1,1063 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+@Category(DistributedTest.class)
+public class ReplicatedRegion_ParallelWANPropagationDUnitTest extends WANTestBase{
+
+  public ReplicatedRegion_ParallelWANPropagationDUnitTest() {
+    super();
+    // TODO Auto-generated constructor stub
+  }
+
+  final String expectedExceptions = null;
+  
+  
+  /**
+   * 
+   */
+  private static final long serialVersionUID = 1L;
+
+  @Test
+  public void test_DR_PGS_1Nodes_Put_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> WANTestBase.createReceiver());
+      vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", null, isOffHeap() ));
+
+      createCacheInVMs(lnPort, vm4);
+
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap() ));
+
+      vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+          true, 10, 100, false, false, null, true ));
+
+      vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
+      fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region");
+    }
+    catch (Exception e) {
+      if (!e.getCause().getMessage()
+          .contains("can not be used with replicated region")) {
+        fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region");
+      }
+    }
+  }
+  
+  /*1. Validate that parallelGatewaySenderId can be added to distributed region
+   *Region distributed ack/noack + PGS
+   *1. Find out the restrictions on totalNumBuckets on shadowPR
+   *2. Find out the restrictions on redundancy on shadowPR
+   *3. Find out the restrictions on localMaxMemory on shadowPR
+   *4. Find out the best way user will specify PR attributes to PGS
+   *5. Find out the restrictions on ordering.
+   *6. put on region populates the queue    
+   *7. put on region reaches to remote site. Dispatcher works as expected
+   *8. m1 and m2 has DR(ack/noack). put on DR from m1 populates queue on both m1 and m2. Validate that remote site got all the events
+   *9. m1 and m2 has DR(ack/noack). create/put/destroy/operations populates the queue. Validate that remote site got correct events
+   *10. m1 and m2 has DR(ack/noack). localDestroy is called on m1's DR. This locally destroys M1's shadowPr
+   *11. m1 and m2 has DR(ack/noack). destroy is called on m1's DR. This destroys entire shadowPr on m1 and m2
+   *12. m1 and m2 has DR(ack/noack). close Region is called on m1's DR. This locally destroys shadowPr on m1
+   *13. m1 and m2 has DR(ack/noack). cache.close on m1'. This locally destroys shadowPr on m1
+   *14. Validate HA scenario does not cause any event loss
+   *15. PDX events of DR are propagated to remote sites
+   *16. validate stats
+   *17: PR and DR regions with same name.. Can this be created. If yes then how to differentiate these 2 different shadowPR. 
+   *18. test for redundancy. FOR SPR's redundancy will be equal to the number of nodes where DR is present. Max is 3. I know this needs to be figure it out at runtime. 
+   *19. test without providing diskStoreName..I suspect some problem with this code. diskStoreName=null looks like this is not handled very well. need to verify
+   *20. ParallelGatewaySenderQueue#addPR method has multiple check for inPersistenceEnabled. Can's we do it with only one check.
+  */
+  
+  /**
+   * Test to validate that created parallel gatewaySenders id can be added to
+   * distributed region
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+    2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  @Ignore
+  @Test
+  public void test_PGS_Started_DR_CREATED_NO_RECEIVER() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+/*      ExpectedException exp1 = addExpectedException(GatewaySenderException.class
+          .getName(), vm4);
+      ExpectedException exp2 = addExpectedException(InterruptedException.class
+          .getName(), vm4);
+      try {
+*/        vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+            true, 10, 100, false, false, null, false ));
+        vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+            getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+        vm4.invoke(() -> WANTestBase.doPuts(
+            getTestMethodName() + "_RR", 1000 ));
+        vm4.invoke(() -> WANTestBase.validateQueueContents(
+            "ln1", 1000 ));
+
+/*      }
+      finally {
+        exp1.remove();
+        exp2.remove();
+      }
+*/    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+
+  /**
+   * Test to validate that distributed region with given parallelGatewaySender id
+   * is created first and then a same parallelGatewaySender is created
+   * a single put in DR is enqueued in parallelQueue
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+    2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  @Ignore
+  @Test
+  public void test_DR_CREATED_PGS_STARTED_NO_RECEIVER() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+/*      ExpectedException exp1 = addExpectedException(GatewaySenderException.class
+          .getName(), vm4);
+      ExpectedException exp2 = addExpectedException(InterruptedException.class
+          .getName(), vm4);
+      try {*/
+        vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+            true, 10, 100, false, false, null, false ));
+        vm4.invoke(() -> WANTestBase.doPuts(
+            getTestMethodName() + "_RR", 1000 ));
+        vm4.invoke(() -> WANTestBase.validateQueueContents(
+            "ln1", 1000 ));
+/*      }
+      finally {
+        exp1.remove();
+        exp2.remove();
+      }
+*/    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_1Node_Put_ValidateQueue_No_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+      
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      
+/*      ExpectedException exp1 = addExpectedException(GatewaySenderException.class
+          .getName(), vm4);
+      ExpectedException exp2 = addExpectedException(InterruptedException.class
+          .getName(), vm4);
+      try {*/
+        vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+            true, 10, 100, false, false, null, true ));
+        vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
+
+        vm4.invoke(() -> WANTestBase.doPuts(
+            getTestMethodName() + "_RR", 10000 ));
+
+        vm4.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_RR", 10000 ));
+        vm4.invoke(() -> WANTestBase.validateQueueContents(
+            "ln1", 10000 ));
+/*      }
+    finally {
+      exp1.remove();
+      exp2.remove();
+    }
+     */ 
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_2Nodes_Put_ValidateQueue_No_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+      vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+      
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      
+/*      ExpectedException exp1 = addExpectedException(
+          GatewaySenderException.class.getName());
+      ExpectedException exp2 = addExpectedException(
+          InterruptedException.class.getName());
+      ExpectedException exp3 = addExpectedException(
+          CacheClosedException.class.getName());
+      try {
+*/        vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+            true, 10, 100, false, false, null, true ));
+        vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+            true, 10, 100, false, false, null, true ));
+
+        startSenderInVMs("ln1", vm4, vm5);
+
+        vm4.invoke(() -> WANTestBase.doPuts(
+            getTestMethodName() + "_RR", 1000 ));
+
+        vm4.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_RR", 1000 ));
+        vm5.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_RR", 1000 ));
+
+        vm4.invoke(() -> WANTestBase.validateQueueContents(
+            "ln1", 1000 ));
+        vm5.invoke(() -> WANTestBase.validateQueueContents(
+            "ln1", 1000 ));
+
+/*      }
+      finally {
+        exp1.remove();
+        exp2.remove();
+        exp3.remove();
+      }
+*/      
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+  
+//  public void test_DR_PGS_ORDERPOLICY_PARTITION_EXPECTException(){
+//    
+//  }
+//  public void test_DR_PGS_DISKSTORE_NAME_PROVIDED_VALIDATE_DISK(){
+//    
+//  }
+//  public void test_DR_PGS_DISKSTORE_NAME_NOT_PROVIDED_VALIDATE_DISK(){
+//    
+//  }
+//  
+//  public void test_DR_PGS_START_STOP_START(){
+//    
+//  }
+//
+//  public void test_DR_PGS_PERSISTENCE_START_STOP_START(){
+//    
+//  }
+//  
+//  public void test_DR_PGS_START_PAUSE_STOP(){
+//    
+//  }
+//
+//  public void test_DR_PGS_START_PAUSE_RESUME_VALIDATE_RECEIVER(){
+//    
+//  }
+
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_1Nodes_Put_Receiver_2() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> WANTestBase.createReceiver());
+      vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+
+      createCacheInVMs(lnPort, vm4);
+
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      
+      vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+        true, 10, 100, false, false, null, true));
+
+      vm4.invoke(() -> WANTestBase.startSender( "ln1"));
+      
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000));
+     
+      vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000));
+      
+      vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      
+      vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000));
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_2Nodes_Put_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+      vm2.invoke(() -> WANTestBase.createReceiver());
+
+      createCacheInVMs(lnPort, vm4, vm5);
+      
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      
+      vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+        true, 10, 100, false, false, null, true));
+      vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      true, 10, 100, false, false, null, true));
+
+      startSenderInVMs("ln1", vm4, vm5);
+      
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+     
+      vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      
+      vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      
+      vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_2Nodes_EMPTY_Put_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      vm2.invoke(() -> WANTestBase.createCache( nyPort ));
+      vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+      vm2.invoke(() -> WANTestBase.createReceiver());
+
+      vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+      vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+      
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.EMPTY, isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, isOffHeap()  ));
+      
+      vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+        true, 10, 100, false, false, null, true));
+      vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      true, 10, 100, false, false, null, true));
+
+      startSenderInVMs("ln1", vm4, vm5);
+      
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+     
+//      vm4.invoke(() -> WANTestBase.validateRegionSize( testName + "_RR",
+//        1000 ));
+      vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      
+      vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      
+      vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PR_PGS_4Nodes_Put_Receiver_2Nodes() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      createCacheInVMs(nyPort, vm2, vm3);
+      vm2.invoke(() -> WANTestBase.createReceiver());
+      vm3.invoke(() -> WANTestBase.createReceiver());
+      
+      vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+      vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+      
+      vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap()  ));
+      vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 1, 100, isOffHeap()  ));
+
+      createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+      
+      vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+          getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+          getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()  ));
+      vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+          getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()  ));
+      vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+          getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap()  ));
+
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+      vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 10, 100, false, false, null, true ));
+      vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 10, 100, false, false, null, true ));
+      vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 10, 100, false, false, null, true ));
+      vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 10, 100, false, false, null, true ));
+
+      startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+      vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+      vm5.invoke(() -> WANTestBase.doNextPuts( getTestMethodName() + "_PR",
+        1000, 2000 ));
+      
+      vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_PR",
+        1000 ));
+      vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      
+      vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
+        0 ));
+      vm5.invoke(() -> WANTestBase.validateQueueContents( "ln",
+        0 ));
+      
+/*      ExpectedException exp1 = addExpectedException(CacheClosedException.class
+          .getName());
+      try {*/
+        vm2.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_RR", 1000 ));
+        vm3.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_RR", 1000 ));
+        vm2.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_PR", 1000 ));
+        vm3.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_PR", 1000 ));
+/*      }
+      finally {
+        exp1.remove();
+      }
+*/    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_NOMANUALSTART_4Nodes_Put_ValidateReceiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> WANTestBase.createReceiver());
+      vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+
+      createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+      
+      vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+          true, 10, 100, false, false, null, false ));
+      vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+          true, 10, 100, false, false, null, false ));
+      vm6.invoke(() -> WANTestBase.createSender( "ln1", 2,
+          true, 10, 100, false, false, null, false ));
+      vm7.invoke(() -> WANTestBase.createSender( "ln1", 2,
+          true, 10, 100, false, false, null, false ));
+    
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+      vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln1", isOffHeap()  ));
+
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+     
+      vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      vm6.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      vm7.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+
+      
+      vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      vm6.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      vm7.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      
+/*      ExpectedException exp1 = addExpectedException(CacheClosedException.class
+          .getName());
+      try {*/
+        vm2.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_RR", 1000 ));
+/*      }
+      finally {
+        exp1.remove();
+      }*/
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_4Nodes_Put_CLOSE4NODESCACHE_RECREATE_PUT_ValidateReceiver()
+      throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      createCacheInVMs(nyPort, vm2, vm3);
+      createReceiverInVMs(vm2, vm3);
+
+      createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+      vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 100, 10, false, false, null, true ));
+      vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 100, 10, false, false, null, true ));
+      vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 100, 10, false, false, null, true ));
+      vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 100, 10, false, false, null, true ));
+
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+      startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+      vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", null, isOffHeap()  ));
+      vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", null, isOffHeap()  ));
+
+      // before doing any puts, let the senders be running in order to ensure
+      // that
+      // not a single event will be lost
+      vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+          1000 ));
+      vm2.invoke(() -> WANTestBase.validateRegionSize(
+          getTestMethodName() + "_RR", 1000 ));
+
+/*      ExpectedException exp1 = addExpectedException(CacheClosedException.class
+          .getName());
+      try {*/
+        vm4.invoke(() -> WANTestBase.killSender());
+        vm5.invoke(() -> WANTestBase.killSender());
+        vm6.invoke(() -> WANTestBase.killSender());
+        vm7.invoke(() -> WANTestBase.killSender());
+/*      }
+      finally {
+        exp1.remove();
+      }*/
+
+      createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+      vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 100, 10, false, false, null, true ));
+      vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 100, 10, false, false, null, true ));
+      vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 100, 10, false, false, null, true ));
+      vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+          true, 100, 10, false, false, null, true ));
+
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+      vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+          getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+      startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+      vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+      // ------------------------------------------------------------------------------------
+
+      vm4.invoke(() -> WANTestBase.doNextPuts(
+          getTestMethodName() + "_RR", 1000, 2000 ));
+
+      // verify all buckets drained on all sender nodes.
+      vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+      vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+      vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+      vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+/*      exp1 = addExpectedException(CacheClosedException.class.getName());
+      try {*/
+        vm2.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_RR", 2000 ));
+        vm3.invoke(() -> WANTestBase.validateRegionSize(
+            getTestMethodName() + "_RR", 2000 ));
+/*      }
+      finally {
+        exp1.remove();
+      }*/
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_NO_ACK_PGS_2Nodes_Put_ValidateQueue_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+      createCacheInVMs(nyPort, vm2);
+      vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap()  ));
+      vm2.invoke(() -> WANTestBase.createReceiver());
+
+      createCacheInVMs(lnPort, vm4, vm5);
+      
+      vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1",
+              Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap()  ));
+      vm5.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1",
+              Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap()   ));
+      
+      vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
+        true, 10, 100, false, false, null, true));
+      vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      true, 10, 100, false, false, null, true));
+
+      startSenderInVMs("ln1", vm4, vm5);
+
+      vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
+        1000 ));
+     
+      vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
+        1000 ));
+      
+      vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
+        0 ));
+      
+      vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+    }
+    catch (Exception e) {
+      Assert.fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_2NODES_1NODESDOWN_Validate_Receiver() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5);
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+    Thread.sleep(60000);;
+    
+/*    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try {*/
+      AsyncInvocation inv1 = vm4.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0(
+              getTestMethodName() + "_RR", 1000 ));
+      Wait.pause(1000);
+      AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.killSender());
+      try {
+        inv1.join();
+        inv2.join();
+      }
+      catch (Exception e) {
+        Assert.fail("UnExpected Exception", e);
+      }
+/*    }
+    finally {
+      exp1.remove();
+    }*/
+
+    Integer size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
+    LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size);
+    
+
+    vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+    
+    size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
+    LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size);
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 1000 ));
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+  2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  @Ignore
+  @Test
+  public void test_DR_PGS_4NODES_2NODESDOWN_Validate_Receiver() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true,
+        100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm5.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm6.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm7.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+    Thread.sleep(60000);
+/*    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try */{
+      AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0(
+              getTestMethodName() + "_RR", 10000 ));
+      Thread.sleep(1000);
+      AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
+      Thread.sleep(2000);
+      AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts1(
+              getTestMethodName() + "_RR", 10000 ));
+      Thread.sleep(1500);
+      AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
+      try {
+        inv1.join();
+        inv2.join();
+        inv3.join();
+        inv4.join();
+      }
+      catch (Exception e) {
+        Assert.fail("UnExpected Exception", e);
+      }
+    }/*
+    finally {
+      exp1.remove();
+    }*/
+
+    vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+    vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 10000 ));
+    
+  }
+  
+  public static void doPuts0(String regionName, int numPuts) {
+    IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+        .getName());
+    IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
+        .getName());
+    try {
+
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        LogWriterUtils.getLogWriter().info("Put : key : " + i);
+        r.put(i, "0_" + i);
+      }
+    } finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+  
+  public static void doPuts1(String regionName, int numPuts){
+    IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+        .getName());
+    IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
+        .getName());
+    try {
+
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        LogWriterUtils.getLogWriter().info("Put : key : " + i);
+        r.put(i, "1_" + i);
+      }
+    } finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+  
+  public static void doPuts2(String regionName, int numPuts){
+    IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+        .getName());
+    IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
+        .getName());
+    try {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        LogWriterUtils.getLogWriter().info("Put : key : " + i);
+        r.put(i, "2_" + i);
+      }
+    } finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+    
+  /**
+   * Test to validate that put on DR with no ack on multiple nodes are propagated to parallelQueue on multiple nodes
+   */
+  
+  /**
+   * Test to validate that the single put in DR is propagated to remote site through parallelGatewaySender
+   */
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
new file mode 100644
index 0000000..0ee78c5
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class SenderWithTransportFilterDUnitTest extends WANTestBase {
+
+  @Test
+  public void testSerialSenderWithTransportFilter() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort ));
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", null, isOffHeap() ));
+
+    vm3.invoke(() -> WANTestBase.createCache( lnPort ));
+
+    vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, false, 100,
+            1, false, false, true ));
+
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+        getTestMethodName() + "_RR", "ln", isOffHeap() ));
+
+    vm3.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 100 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_RR", 100 ));
+  }
+
+  @Test
+  public void testParallelSenderWithTransportFilter() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort ));
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", null, 0, 10, isOffHeap() ));
+
+    vm3.invoke(() -> WANTestBase.createCache( lnPort ));
+
+    vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, true, 100,
+            1, false, false, true ));
+
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR", "ln", 0, 10, isOffHeap() ));
+
+    vm3.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
+
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR", 100 ));
+  }
+  
+  public static int createReceiverWithTransportFilters(int locPort) {
+    WANTestBase test = new WANTestBase();
+    Properties props = test.getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
+    transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
+    if (!transportFilters.isEmpty()) {
+      for (GatewayTransportFilter filter : transportFilters) {
+        fact.addGatewayTransportFilter(filter);
+      }
+    }
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    }
+    catch (IOException e) {
+      fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port, e);
+    }
+    return port;
+  }
+
+  public static void createSenderWithTransportFilter(String dsName,
+      int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, boolean isManualStart) {
+    File persistentDirectory = new File(dsName + "_disk_"
+        + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File[] dirs1 = new File[] { persistentDirectory };
+
+    if (isParallel) {
+      GatewaySenderFactory gateway = cache
+          .createGatewaySenderFactory();
+      gateway.setParallel(true);
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+      ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
+      transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
+      if (!transportFilters.isEmpty()) {
+        for (GatewayTransportFilter filter : transportFilters) {
+          gateway.addGatewayTransportFilter(filter);
+        }
+      }
+      if (isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+            .getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      gateway.create(dsName, remoteDsId);
+
+    }
+    else {
+      GatewaySenderFactory gateway = cache
+          .createGatewaySenderFactory();
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManualStart);
+      ((InternalGatewaySenderFactory)gateway)
+          .setLocatorDiscoveryCallback(new MyLocatorCallback());
+      ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
+      transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
+      if (!transportFilters.isEmpty()) {
+        for (GatewayTransportFilter filter : transportFilters) {
+          gateway.addGatewayTransportFilter(filter);
+        }
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      if (isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+            .getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.create(dsName, remoteDsId);
+    }
+  }
+
+  static class CheckSumTransportFilter implements GatewayTransportFilter {
+
+    Adler32 checker = new Adler32();
+    
+    private String name;
+    
+    public CheckSumTransportFilter(String name){
+      this.name = name;
+    }
+
+    @Override
+    public String toString(){
+      return this.name;
+    }
+
+    @Override
+    public InputStream getInputStream(InputStream stream) {
+      return new CheckedInputStream(stream, checker);
+    }
+
+    @Override
+    public OutputStream getOutputStream(OutputStream stream) {
+      return new CheckedOutputStream(stream, checker);
+    }
+
+    @Override
+    public void close() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..6e2581e
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import static org.junit.Assert.*;
+
+import java.util.Set;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.admin.AdminDistributedSystemFactory;
+import com.gemstone.gemfire.admin.AdminException;
+import com.gemstone.gemfire.admin.DistributedSystemConfig;
+import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.cache.CacheObserverAdapter;
+import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase {
+
+  private static final long MAX_WAIT = 70000;
+
+  private static final int NUM_KEYS = 1000;
+
+  public ShutdownAllPersistentGatewaySenderDUnitTest() {
+    super();
+  }
+  
+  @Override
+  protected final void postSetUpWANTestBase() throws Exception {
+    IgnoredException.addIgnoredException("Cache is being closed by ShutdownAll");
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  @Test
+  public void testGatewaySender() throws Exception {
+    IgnoredException.addIgnoredException("Cache is shutting down");
+
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    vm2.invoke(() -> WANTestBase.createCache( nyPort ));
+    vm3.invoke(() -> WANTestBase.createCache( nyPort ));
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 400, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+
+    vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+
+    // set the CacheObserver to block the ShutdownAll
+    SerializableRunnable waitAtShutdownAll = new SerializableRunnable() {
+      @Override
+      public void run() {
+        LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
+        CacheObserverHolder.setInstance(new CacheObserverAdapter() {
+          @Override
+          public void beforeShutdownAll() {
+            final Region region = cache.getRegion(getTestMethodName() + "_PR");
+            Wait.waitForCriterion(new WaitCriterion() {
+              @Override
+              public boolean done() {
+                return region.size() >= 2;
+              }
+
+              @Override
+              public String description() {
+                return "Wait for wan to have processed several events";
+              }
+            }, 30000, 100, true);
+          }
+        });
+      }
+    };
+    vm2.invoke(waitAtShutdownAll);
+    vm3.invoke(waitAtShutdownAll);
+    
+    AsyncInvocation vm4_future = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS ));
+
+    // ShutdownAll will be suspended at observer, so puts will continue
+    AsyncInvocation future = shutDownAllMembers(vm2, 2, MAX_WAIT);
+    future.join(MAX_WAIT);
+
+    // now restart vm1 with gatewayHub
+    LogWriterUtils.getLogWriter().info("restart in VM2");
+    vm2.invoke(() -> WANTestBase.createCache( nyPort ));
+    vm3.invoke(() -> WANTestBase.createCache( nyPort ));
+    AsyncInvocation vm3_future = vm3.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR",
+            "ln", 1, 100, isOffHeap() ));
+    vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
+    vm3_future.join(MAX_WAIT);
+
+    vm3.invoke(new SerializableRunnable() {
+      public void run() {
+        final Region region = cache.getRegion(getTestMethodName() + "_PR");
+        cache.getLogger().info(
+            "vm1's region size before restart gatewayHub is " + region.size());
+      }
+    });
+    vm2.invoke(() -> WANTestBase.createReceiver());
+
+    // wait for vm0 to finish its work
+    vm4_future.join(MAX_WAIT);
+    vm4.invoke(new SerializableRunnable() {
+      public void run() {
+        Region region = cache.getRegion(getTestMethodName() + "_PR");
+        assertEquals(NUM_KEYS, region.size());
+      }
+    });
+
+    // verify the other side (vm1)'s entries received from gateway
+    vm2.invoke(new SerializableRunnable() {
+      public void run() {
+        final Region region = cache.getRegion(getTestMethodName() + "_PR");
+
+        cache.getLogger().info(
+            "vm1's region size after restart gatewayHub is " + region.size());
+        Wait.waitForCriterion(new WaitCriterion() {
+          public boolean done() {
+            Object lastValue = region.get(NUM_KEYS - 1);
+            if (lastValue != null && lastValue.equals(NUM_KEYS - 1)) {
+              region.getCache().getLogger().info(
+                  "Last key has arrived, its value is " + lastValue
+                      + ", end of wait.");
+              return true;
+            }
+            else
+              return (region.size() == NUM_KEYS);
+          }
+
+          public String description() {
+            return "Waiting for destination region to reach size: " + NUM_KEYS
+                + ", current is " + region.size();
+          }
+        }, MAX_WAIT, 100, true);
+        assertEquals(NUM_KEYS, region.size());
+      }
+    });
+
+  }
+
+  private AsyncInvocation shutDownAllMembers(VM vm, final int expectedNumber, final long timeout) {
+      AsyncInvocation future = vm.invokeAsync(new SerializableRunnable("Shutdown all the members") {
+
+      public void run() {
+        DistributedSystemConfig config;
+        AdminDistributedSystemImpl adminDS = null;
+        try {
+          config = AdminDistributedSystemFactory.defineDistributedSystem(cache
+              .getDistributedSystem(), "");
+          adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory
+              .getDistributedSystem(config);
+          adminDS.connect();
+          Set members = adminDS.shutDownAllMembers(timeout);
+          int num = members == null ? 0 : members.size();
+          assertEquals(expectedNumber, num);
+        }
+        catch (AdminException e) {
+          throw new RuntimeException(e);
+        }
+        finally {
+          if (adminDS != null) {
+            adminDS.disconnect();
+          }
+        }
+      }
+    });
+    return future;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
new file mode 100644
index 0000000..b504f87
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANConfigurationJUnitTest.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.wan.*;
+import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverException;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
+import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class WANConfigurationJUnitTest {
+
+  private Cache cache;
+
+  /**
+   * Test to validate that the sender can not be started without configuring
+   * locator
+   * @throws IOException 
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void test_GatewaySender_without_Locator() throws IOException {
+    try {
+      cache = new CacheFactory().set(MCAST_PORT, "0").create();
+      GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+      fact.setParallel(true);
+      GatewaySender sender1 = fact.create("NYSender", 2);
+      sender1.start();
+      fail("Expected IllegalStateException but not thrown");
+    }
+    catch (Exception e) {
+      if ((e instanceof IllegalStateException && e
+          .getMessage()
+          .startsWith(
+              LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
+                  .toLocalizedString()))) {
+      }
+      else {
+        fail("Expected IllegalStateException but received :" + e);
+      }
+    }
+  }
+
+  /**
+   * Test to validate that sender with same Id can not be added to cache.
+   */
+  @Test
+  public void test_SameGatewaySenderCreatedTwice() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    try {
+      GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+      fact.setParallel(true);
+      fact.setManualStart(true);
+      fact.create("NYSender", 2);
+      fact.create("NYSender", 2);
+      fail("Expected IllegalStateException but not thrown");
+    }
+    catch (Exception e) {
+      if (e instanceof IllegalStateException
+          && e.getMessage().contains("A GatewaySender with id")) {
+
+      }
+      else {
+        fail("Expected IllegalStateException but received :" + e);
+      }
+    }
+  }
+  
+  /**
+   * Test to validate that same gatewaySender Id can not be added to the region attributes.
+   */
+  @Test
+  public void test_SameGatewaySenderIdAddedTwice() {
+    try {
+      cache = new CacheFactory().set(MCAST_PORT, "0").create();
+      GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+      fact.setParallel(true);
+      fact.setManualStart(true);
+      GatewaySender sender1 = fact.create("NYSender", 2);
+      AttributesFactory factory = new AttributesFactory();
+      factory.addGatewaySenderId(sender1.getId());
+      factory.addGatewaySenderId(sender1.getId());
+      fail("Expected IllegalArgumentException but not thrown");
+    }
+    catch (Exception e) {
+      if (e instanceof IllegalArgumentException
+          && e.getMessage().contains("is already added")) {
+
+      }
+      else {
+        fail("Expected IllegalStateException but received :" + e);
+      }
+    }
+  }
+  
+  @Test
+  public void test_GatewaySenderIdAndAsyncEventId() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+      AttributesFactory factory = new AttributesFactory();
+      factory.addGatewaySenderId("ln");
+      factory.addGatewaySenderId("ny");
+      factory.addAsyncEventQueueId("Async_LN");
+      RegionAttributes attrs = factory.create();
+      
+      Set<String> senderIds = new HashSet<String>();
+      senderIds.add("ln");
+      senderIds.add("ny");
+      Set<String> attrsSenderIds = attrs.getGatewaySenderIds();
+      assertEquals(senderIds, attrsSenderIds);
+      Region r = cache.createRegion("Customer", attrs);
+      assertEquals(senderIds, ((LocalRegion)r).getGatewaySenderIds());
+  }
+
+  /**
+   * Test to validate that distributed region can not have the gateway sender
+   * with parallel distribution policy
+   * 
+   */
+  @Ignore("Bug51491")
+  @Test
+  public void test_GatewaySender_Parallel_DistributedRegion() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setParallel(true);
+    fact.setManualStart(true);
+    GatewaySender sender1 = fact.create("NYSender", 2);
+    AttributesFactory factory = new AttributesFactory();
+    factory.addGatewaySenderId(sender1.getId());
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    factory.setDataPolicy(DataPolicy.REPLICATE);
+    try {
+      RegionFactory regionFactory = cache.createRegionFactory(factory.create());
+      Region region = regionFactory
+          .create("test_GatewaySender_Parallel_DistributedRegion");
+    }
+    catch (Exception e) {
+      fail("Unexpected Exception :" + e);
+    }
+  }
+  
+  @Test
+  public void test_GatewaySender_Parallel_MultipleDispatcherThread() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setParallel(true);
+    fact.setManualStart(true);
+    fact.setDispatcherThreads(4);
+    try {
+      GatewaySender sender1 = fact.create("NYSender", 2);
+    }
+    catch (GatewaySenderException e) {
+       fail("UnExpected Exception " + e);
+    }
+  }
+  
+  @Test
+  public void test_GatewaySender_Serial_ZERO_DispatcherThread() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setManualStart(true);
+    fact.setDispatcherThreads(0);
+    try {
+      GatewaySender sender1 = fact.create("NYSender", 2);
+      fail("Expected GatewaySenderException but not thrown");
+    }
+    catch (GatewaySenderException e) {
+      if (e.getMessage().contains("can not be created with dispatcher threads less than 1")) {
+      }
+      else {
+        fail("Expected IllegalStateException but received :" + e);
+      }
+    }
+  }
+
+  /**
+   * Test to validate the gateway receiver attributes are correctly set
+   */
+  @Test
+  public void test_ValidateGatewayReceiverAttributes() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    int port1 = randomAvailableTCPPorts[0];
+    int port2 = randomAvailableTCPPorts[1];
+    
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    if(port1 < port2){
+      fact.setStartPort(port1);
+      fact.setEndPort(port2);  
+    }else{
+      fact.setStartPort(port2);
+      fact.setEndPort(port1);
+    }
+    
+    fact.setMaximumTimeBetweenPings(2000);
+    fact.setSocketBufferSize(200);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+    fact.addGatewayTransportFilter(myStreamFilter2);
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    GatewayReceiver receiver1 = fact.create();
+    
+
+    Region region = cache.createRegionFactory().create(
+        "test_ValidateGatewayReceiverAttributes");
+    Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+    GatewayReceiver rec = receivers.iterator().next();
+    assertEquals(receiver1.getHost(), rec.getHost());
+    assertEquals(receiver1.getStartPort(), rec.getStartPort());
+    assertEquals(receiver1.getEndPort(), rec.getEndPort());
+    assertEquals(receiver1.getBindAddress(), rec.getBindAddress());
+    assertEquals(receiver1.getMaximumTimeBetweenPings(), rec
+        .getMaximumTimeBetweenPings());
+    assertEquals(receiver1.getSocketBufferSize(), rec
+        .getSocketBufferSize());
+    assertEquals(receiver1.getGatewayTransportFilters().size(), rec
+        .getGatewayTransportFilters().size());
+
+  }
+
+  @Test
+  public void test_ValidateGatewayReceiverStatus() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    int port1 = randomAvailableTCPPorts[0];
+    int port2 = randomAvailableTCPPorts[1];
+    
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    if(port1 < port2){
+      fact.setStartPort(port1);
+      fact.setEndPort(port2);  
+    }else{
+      fact.setStartPort(port2);
+      fact.setEndPort(port1);
+    }
+    
+    fact.setMaximumTimeBetweenPings(2000);
+    fact.setSocketBufferSize(200);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+    fact.addGatewayTransportFilter(myStreamFilter2);
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    GatewayReceiver receiver1 = fact.create();
+    assertTrue(receiver1.isRunning());
+  }
+  
+  /**
+   * Test to validate that serial gateway sender attributes are correctly set
+   */
+  @Test
+  public void test_ValidateSerialGatewaySenderAttributes() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setManualStart(true);
+    fact.setBatchConflationEnabled(true);
+    fact.setBatchSize(200);
+    fact.setBatchTimeInterval(300);
+    fact.setPersistenceEnabled(false);
+    fact.setDiskStoreName("FORNY");
+    fact.setMaximumQueueMemory(200);
+    fact.setAlertThreshold(1200);
+    GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
+    fact.addGatewayEventFilter(myEventFilter1);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+    fact.addGatewayTransportFilter(myStreamFilter2);
+    GatewaySender sender1 = fact.create("TKSender", 2);
+    
+
+    AttributesFactory factory = new AttributesFactory();
+    factory.addGatewaySenderId(sender1.getId());
+    factory.setDataPolicy(DataPolicy.PARTITION);
+    Region region = cache.createRegionFactory(factory.create()).create(
+        "test_ValidateGatewaySenderAttributes");
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    assertEquals(senders.size(), 1);
+    GatewaySender gatewaySender = senders.iterator().next();
+    assertEquals(sender1.getRemoteDSId(), gatewaySender
+        .getRemoteDSId());
+    assertEquals(sender1.isManualStart(), gatewaySender.isManualStart());
+    assertEquals(sender1.isBatchConflationEnabled(), gatewaySender
+        .isBatchConflationEnabled());
+    assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
+    assertEquals(sender1.getBatchTimeInterval(), gatewaySender
+        .getBatchTimeInterval());
+    assertEquals(sender1.isPersistenceEnabled(), gatewaySender
+        .isPersistenceEnabled());
+    assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
+    assertEquals(sender1.getMaximumQueueMemory(), gatewaySender
+        .getMaximumQueueMemory());
+    assertEquals(sender1.getAlertThreshold(), gatewaySender
+        .getAlertThreshold());
+    assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender
+        .getGatewayEventFilters().size());
+    assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender
+        .getGatewayTransportFilters().size());
+
+  }
+  
+  /**
+   * Test to validate that parallel gateway sender attributes are correctly set
+   */
+  @Test
+  public void test_ValidateParallelGatewaySenderAttributes() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    fact.setParallel(true);
+    fact.setManualStart(true);
+    fact.setBatchConflationEnabled(true);
+    fact.setBatchSize(200);
+    fact.setBatchTimeInterval(300);
+    fact.setPersistenceEnabled(false);
+    fact.setDiskStoreName("FORNY");
+    fact.setMaximumQueueMemory(200);
+    fact.setAlertThreshold(1200);
+    GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
+    fact.addGatewayEventFilter(myEventFilter1);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
+    fact.addGatewayTransportFilter(myStreamFilter2);
+    GatewaySender sender1 = fact.create("TKSender", 2);
+    
+
+    AttributesFactory factory = new AttributesFactory();
+    factory.addGatewaySenderId(sender1.getId());
+    factory.setDataPolicy(DataPolicy.PARTITION);
+    Region region = cache.createRegionFactory(factory.create()).create(
+        "test_ValidateGatewaySenderAttributes");
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    assertEquals(1, senders.size());
+    GatewaySender gatewaySender = senders.iterator().next();
+    assertEquals(sender1.getRemoteDSId(), gatewaySender
+        .getRemoteDSId());
+    assertEquals(sender1.isManualStart(), gatewaySender.isManualStart());
+    assertEquals(sender1.isBatchConflationEnabled(), gatewaySender
+        .isBatchConflationEnabled());
+    assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
+    assertEquals(sender1.getBatchTimeInterval(), gatewaySender
+        .getBatchTimeInterval());
+    assertEquals(sender1.isPersistenceEnabled(), gatewaySender
+        .isPersistenceEnabled());
+    assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
+    assertEquals(sender1.getMaximumQueueMemory(), gatewaySender
+        .getMaximumQueueMemory());
+    assertEquals(sender1.getAlertThreshold(), gatewaySender
+        .getAlertThreshold());
+    assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender
+        .getGatewayEventFilters().size());
+    assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender
+        .getGatewayTransportFilters().size());
+
+  }
+  
+  @Test
+  public void test_GatewaySenderWithGatewaySenderEventListener1() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    InternalGatewaySenderFactory fact = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory();
+    AsyncEventListener listener = new MyGatewaySenderEventListener();
+    ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener);
+    try {
+      fact.create("ln", 2);
+      fail("Expected GatewaySenderException. When a sender is added , remoteDSId should not be provided.");
+    } catch (Exception e) {
+      if (e instanceof GatewaySenderException
+          && e.getMessage()
+              .contains(
+                  "cannot define a remote site because at least AsyncEventListener is already added.")) {
+
+      } else {
+        fail("Expected GatewaySenderException but received :" + e);
+      }
+    }
+  }  
+  
+  @Test
+  public void test_GatewaySenderWithGatewaySenderEventListener2() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewaySenderFactory fact = cache.createGatewaySenderFactory();
+    AsyncEventListener listener = new MyGatewaySenderEventListener();
+    ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener);
+    try {
+      ((InternalGatewaySenderFactory)fact).create("ln");
+    } catch (Exception e) {
+      fail("Received Exception :" + e);
+    }
+  }
+  
+  @Test
+  public void test_ValidateGatewayReceiverAttributes_2() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    fact.setStartPort(50504);
+    fact.setMaximumTimeBetweenPings(1000);
+    fact.setSocketBufferSize(4000);
+    fact.setEndPort(70707);
+    fact.setManualStart(true);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    }
+    catch (IOException e) {
+      fail("The test failed with IOException");
+    }
+
+    assertEquals(50504, receiver.getStartPort());
+    assertEquals(1000, receiver.getMaximumTimeBetweenPings());
+    assertEquals(4000,receiver.getSocketBufferSize());
+    assertEquals(70707, receiver.getEndPort());
+  }
+
+  /**
+   * This test takes a minimum of 120s to execute. It is known to hang on Mac OS
+   * X Yosemite do to changes in the the message string checked in
+   * GatewayReceiverImpl around line 167. Expects
+   * "Cannot assign requested address" but gets
+   * "Can't assign requested address". Timeout after 150s to safeguard against
+   * hanging on other platforms that may differ.
+   */
+  @Test(timeout = 150000)
+  public void test_ValidateGatewayReceiverAttributes_WrongBindAddress() {
+    if (System.getProperty("os.name").equals("Mac OS X")) {
+     fail("Failing to avoid known hang on Mac OS X.");
+    }
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    fact.setStartPort(50504);
+    fact.setMaximumTimeBetweenPings(1000);
+    fact.setSocketBufferSize(4000);
+    fact.setEndPort(70707);
+    fact.setManualStart(true);
+    fact.setBindAddress("200.112.204.10");
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+      fail("Expected GatewayReceiverException");
+    }
+    catch (GatewayReceiverException gRE){
+      assertTrue(gRE.getMessage().contains("Failed to create server socket on"));
+    }
+    catch (IOException e) {
+      e.printStackTrace();
+      fail("The test failed with IOException");
+    }
+  }
+  
+  @Test
+  public void test_ValidateGatewayReceiverDefaultStartPortAndDefaultEndPort() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    fact.setMaximumTimeBetweenPings(1000);
+    fact.setSocketBufferSize(4000);
+    fact.setManualStart(true);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    }
+    catch (IOException e) {
+      fail("The test failed with IOException");
+    }
+    int port = receiver.getPort();
+    if((port < 5000) || (port > 5500)) {
+      fail("GatewayReceiver started on out of range port");
+    }
+  }
+  
+  @Test
+  public void test_ValidateGatewayReceiverDefaultStartPortAndEndPortProvided() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    fact.setMaximumTimeBetweenPings(1000);
+    fact.setSocketBufferSize(4000);
+    fact.setEndPort(50707);
+    fact.setManualStart(true);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    }
+    catch (IOException e) {
+      fail("The test failed with IOException");
+    }
+    int port = receiver.getPort();
+    if((port < GatewayReceiver.DEFAULT_START_PORT) || (port > 50707)) {
+      fail("GatewayReceiver started on out of range port");
+    }
+  }
+  
+  @Test
+  public void test_ValidateGatewayReceiverWithManualStartFALSE() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    fact.setMaximumTimeBetweenPings(1000);
+    fact.setSocketBufferSize(4000);
+    fact.setStartPort(5303);
+    fact.setManualStart(false);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+    GatewayReceiver receiver = fact.create();
+    int port = receiver.getPort();
+    if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) {
+      fail("GatewayReceiver started on out of range port");
+    }
+  }
+  
+  @Test
+  public void test_ValidateGatewayReceiverWithStartPortAndDefaultEndPort() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    fact.setMaximumTimeBetweenPings(1000);
+    fact.setSocketBufferSize(4000);
+    fact.setStartPort(5303);
+    fact.setManualStart(true);
+    GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
+    fact.addGatewayTransportFilter(myStreamFilter1);
+
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    }
+    catch (IOException e) {
+      fail("The test failed with IOException");
+    }
+    int port = receiver.getPort();
+    if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) {
+      fail("GatewayReceiver started on out of range port");
+    }
+  }
+  
+  @Test
+  public void test_ValidateGatewayReceiverWithWrongEndPortProvided() {
+    cache = new CacheFactory().set(MCAST_PORT, "0").create();
+    try {
+      GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+      fact.setMaximumTimeBetweenPings(1000);
+      fact.setSocketBufferSize(4000);
+      fact.setEndPort(4999);
+      GatewayReceiver receiver = fact.create();  
+      fail("wrong end port set in the GatewayReceiver");
+    } catch (IllegalStateException expected) {
+      if(!expected.getMessage().contains("Please specify either start port a value which is less than end port.")){
+        fail("Caught IllegalStateException");
+        expected.printStackTrace();
+      }
+    }
+  }
+  
+  @After
+  public void tearDown() throws Exception {
+    if (this.cache != null) {
+      this.cache.close();
+    }
+  }
+}