You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:04 UTC

[16/61] [abbrv] incubator-geode git commit: GEODE-37 change package name from com.gemstone.gemfire (for ./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to ./geode-wan/src/test/java/org/apache/geode)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..9c6cbdd
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+import java.util.Set;
+
+/**
+ *
+ */
+@Category(DistributedTest.class)
+public class CommonParallelGatewaySenderDUnitTest extends WANTestBase {
+
+  @Test
+  public void testSameSenderWithNonColocatedRegions() throws Exception {
+    IgnoredException.addIgnoredException("cannot have the same parallel");
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+    try {
+      vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+      fail("Expected IllegalStateException : cannot have the same parallel gateway sender");
+    }
+    catch (Exception e) {
+      if (!(e.getCause() instanceof IllegalStateException)
+          || !(e.getCause().getMessage()
+              .contains("cannot have the same parallel gateway sender id"))) {
+        Assert.fail("Expected IllegalStateException", e);
+      }
+    }
+  }
+  
+  /**
+   * Simple scenario. Two regions attach the same PGS
+   * @throws Exception
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+    2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  @Test
+  @Ignore("TODO")
+  public void testParallelPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, false, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1",
+        1000 ));
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2",
+      1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR1", 1000 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR2", 1000 ));
+  }
+  
+  /**
+   * The PGS is persistence enabled but not the Regions
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+    2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  @Test
+  @Ignore("TODO")
+  public void testParallelPropagationPersistenceEnabled() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+    vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+    vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+    vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+        true, 100, 10, false, true, null, true ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+        getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1",
+        1000 ));
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2",
+      1000 ));
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+    vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR1", 1000 ));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+        getTestMethodName() + "_PR2", 1000 ));
+  }
+  
+  
+  /**
+   * Enable persistence for GatewaySender.
+   * Pause the sender and do some puts in local region.  
+   * Close the local site and rebuild the region and sender from disk store.
+   * Dispatcher should not start dispatching events recovered from persistent sender.
+   * Check if the remote site receives all the events.
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+    2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  @Test
+  @Ignore("TODO")
+  public void testPRWithGatewaySenderPersistenceEnabled_Restart() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+    //create receiver on remote site
+    createCacheInVMs(nyPort, vm2, vm3);
+    createReceiverInVMs(vm2, vm3);
+
+    //create cache in local site
+    createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+    String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+    LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", null, 1, 100, isOffHeap() ));
+    
+    //create PR on remote site
+    vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", null, 1, 100, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", null, 1, 100, isOffHeap() ));
+    
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+
+    //create PR on local site
+    vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+    vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+    vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+
+    
+    //start the senders on local site
+    startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+    //wait for senders to become running
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    //pause the senders
+    vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+    
+    //start puts in region on local site
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR1", 3000 ));
+    vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR2", 5000 ));
+    LogWriterUtils.getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site -------------------------------------------------
+    //kill the senders
+    vm4.invoke(() -> WANTestBase.killSender());
+    vm5.invoke(() -> WANTestBase.killSender());
+    vm6.invoke(() -> WANTestBase.killSender());
+    vm7.invoke(() -> WANTestBase.killSender());
+    
+    LogWriterUtils.getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+    vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+    
+    LogWriterUtils.getLogWriter().info("Created back the cache");
+    
+   //create senders with disk store
+    vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+    vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+    vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+    vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+    
+    LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+    //create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+    AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+
+    inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+    inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+    inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+    inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+      getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    
+    LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+    
+    //start the senders in async mode. This will ensure that the 
+    //node of shadow PR that went down last will come up first
+    startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+    LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    
+    LogWriterUtils.getLogWriter().info("All the senders are now running...");
+    
+    //----------------------------------------------------------------------------------------------------
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName()+"PR1", 3000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName()+"PR1", 3000 ));
+    
+    vm2.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName()+"PR2", 5000 ));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(
+      getTestMethodName()+"PR2", 5000 ));
+  }
+  
+  public static void validateParallelSenderQueueAllBucketsDrained(final String senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    ConcurrentParallelGatewaySenderQueue regionQueue = (ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0];
+    
+    Set<PartitionedRegion> shadowPRs = (Set<PartitionedRegion>)regionQueue.getRegions();
+    
+    for(PartitionedRegion shadowPR: shadowPRs) {
+      Set<BucketRegion> buckets = shadowPR.getDataStore().getAllLocalBucketRegions();
+      
+      for (final BucketRegion bucket : buckets) {
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            if (bucket.keySet().size() == 0) {
+              LogWriterUtils.getLogWriter().info("Bucket " + bucket.getId() + " is empty");
+              return true;
+            }
+            return false;
+          }
+     
+          public String description() {
+            return "Expected bucket entries for bucket: " + bucket.getId() + " is: 0 but actual entries: " 
+              + bucket.keySet().size() + " This bucket isPrimary: " + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet();
+          }
+        };
+        Wait.waitForCriterion(wc, 180000, 50, true);
+      
+      }//for loop ends
+    }
+    
+
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
new file mode 100644
index 0000000..2fb77ff
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@SuppressWarnings("serial")
+@Category(DistributedTest.class)
+public class CommonParallelGatewaySenderOffHeapDUnitTest extends
+    CommonParallelGatewaySenderDUnitTest {
+
+  public CommonParallelGatewaySenderOffHeapDUnitTest() {
+    super();
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
new file mode 100644
index 0000000..8b8c624
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
+import com.gemstone.gemfire.internal.cache.EntrySnapshot;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+import com.gemstone.gemfire.internal.cache.RegionEntry;
+import com.gemstone.gemfire.internal.cache.Token.Tombstone;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+/**
+ *
+ * Test verifies that version tag for destroyed entry is propagated back to
+ * origin distributed system if the version tag is applied and replaces old
+ * version information in destination distributed system.
+ *
+ * Version tag information which is relevant between multiple distributed
+ * systems consistency check is basically dsid and timestamp.
+ */
+@Category(DistributedTest.class)
+public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase {
+
+  public NewWANConcurrencyCheckForDestroyDUnitTest() {
+    super();
+  }
+
+  @Test
+  public void testVersionTagTimestampForDestroy() {
+    
+    
+    // create three distributed systems with each having a cache containing
+    // a Replicated Region with one entry and concurrency checks enabled.
+
+    // Site 2 and Site 3 only know about Site 1 but Site 1 knows about both
+    // Site 2 and Site 3.
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    createCacheInVMs(lnPort, vm1);
+    Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    createCacheInVMs(nyPort, vm3);
+    Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
+
+    //Site 3
+    Integer tkPort = (Integer)vm4.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+    createCacheInVMs(tkPort, vm5);
+    Integer tkRecPort = (Integer) vm5.invoke(() -> WANTestBase.createReceiver());
+
+    LogWriterUtils.getLogWriter().info("Created locators and receivers in 3 distributed systems");
+     
+    //Site 1
+    vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      true, 10, 1, false, false, null, true ));
+    vm1.invoke(() -> WANTestBase.createSender( "ln2", 3,
+      true, 10, 1, false, false, null, true ));
+    
+    vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1,ln2", 0, 1, isOffHeap() ));
+    vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
+    vm1.invoke(() -> WANTestBase.startSender( "ln2" ));
+    vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
+    vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln2" ));
+    
+    //Site 2
+    vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
+      true, 10, 1, false, false, null, true ));
+    
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
+    vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
+    
+    //Site 3 which only knows about Site 1.
+    vm5.invoke(() -> WANTestBase.createSender( "tk1", 1,
+      true, 10, 1, false, false, null, true ));
+    
+    vm5.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "tk1", 0, 1, isOffHeap() ));
+    vm5.invoke(() -> WANTestBase.startSender( "tk1" ));
+    vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "tk1" ));
+    
+    Wait.pause(2000);
+    
+    // Perform a put in vm1
+    vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        
+        Region region = cache.getRegion("/repRegion");
+        region.put("testKey", "testValue");
+        
+        assertEquals(1, region.size());
+      }
+    });
+
+    //wait for vm1 to propagate put to vm3 and vm5
+    Wait.pause(2000); 
+
+    long destroyTimeStamp = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterOp());
+    
+    //wait for vm1 to propagate destroyed entry's new version tag to vm5
+    Wait.pause(2000); 
+
+    vm5.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.verifyTimestampAfterOp(destroyTimeStamp, 1 /* ds 3 receives gateway event only from ds 1*/));
+  }
+
+  /**
+   * Test creates two sites and one Replicated Region on each with Serial
+   * GatewaySender on each. Test checks for sequence of events being sent from
+   * site1 to site2 for PUTALL and PUT and finally checks for final timestamp in
+   * version for RegionEntry with key "testKey". If timestamp on both site is
+   * same that means events were transferred in correct sequence.
+   */
+  @Test
+  public void testPutAllEventSequenceOnSerialGatewaySenderWithRR() {
+    
+    // create two distributed systems with each having a cache containing
+    // a Replicated Region with one entry and concurrency checks enabled.
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    vm1.invoke(() -> WANTestBase.createCache(lnPort));
+    Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    vm3.invoke(() -> WANTestBase.createCache(nyPort));
+    Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
+
+    LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
+     
+    //Site 1
+    vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      false, 10, 1, false, false, null, true ));
+    
+    vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() ));
+    vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
+    vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
+    
+    //Site 2
+    vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
+      false, 10, 1, false, false, null, true ));
+    
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
+    vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
+    
+    Wait.pause(2000);
+    
+    // Perform a put in vm1
+    AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        // Test hook to make put wait after RE lock is released but before Gateway events are sent.
+        DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; 
+        
+        Region region = cache.getRegion("/repRegion");
+        Map testMap = new HashMap();
+        testMap.put("testKey", "testValue1");
+        region.putAll(testMap);
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    //wait for vm1 to propagate put to vm3
+    Wait.pause(1000); 
+
+    AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        Region region = cache.getRegion("/repRegion");
+        
+        while (!region.containsKey("testKey")) {
+          Wait.pause(10);
+        }
+        // Test hook to make put wait after RE lock is released but before Gateway events are sent.
+        DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; 
+        
+        region.put("testKey", "testValue2");
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    try {
+      asynch1.join(5000);
+      asynch2.join(5000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") {
+        
+        @Override
+        public void run2() throws CacheException {
+          DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
+        }
+      });
+    }
+
+    //Wait for all Gateway events be received by vm3.
+    Wait.pause(1000);
+
+    long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+    
+    long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+    
+    assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+  }
+
+/**
+ * This is similar to above test but for PartitionedRegion.
+ */
+  @Test
+  public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() {
+    
+    // create two distributed systems with each having a cache containing
+    // a Replicated Region with one entry and concurrency checks enabled.
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    createCacheInVMs(lnPort, vm1);
+    Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    createCacheInVMs(nyPort, vm3);
+    Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
+
+    LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
+     
+    //Site 1
+    vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      false, 10, 1, false, false, null, true ));
+    
+    vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1", 0, 1, isOffHeap() ));
+    vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
+    vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
+    
+    //Site 2
+    vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
+      false, 10, 1, false, false, null, true ));
+    
+    vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() ));
+    vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
+    vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
+    
+    Wait.pause(2000);
+    
+    // Perform a put in vm1
+    AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        // Test hook to make put wait after RE lock is released but before Gateway events are sent.
+        DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; 
+        
+        Region region = cache.getRegion("/repRegion");
+        Map testMap = new HashMap();
+        testMap.put("testKey", "testValue1");
+        region.putAll(testMap);
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    //wait for vm1 to propagate put to vm3
+    Wait.pause(1000); 
+
+    AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        Region region = cache.getRegion("/repRegion");
+        
+        while (!region.containsKey("testKey")) {
+          Wait.pause(10);
+        }
+        // Test hook to make put wait after RE lock is released but before Gateway events are sent.
+        DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; 
+        
+        region.put("testKey", "testValue2");
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    try {
+      asynch1.join(5000);
+      asynch2.join(5000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") {
+        
+        @Override
+        public void run2() throws CacheException {
+          DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
+        }
+      });
+    }
+
+    //Wait for all Gateway events be received by vm3.
+    Wait.pause(1000);
+
+    long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+    
+    long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+    
+    assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+  }
+
+  /**
+   * Tests if conflict checks are happening based on DSID and timestamp even if
+   * version tag is generated in local distributed system.
+   */
+  @Test
+  public void testConflictChecksBasedOnDsidAndTimeStamp() {
+
+    
+    // create two distributed systems with each having a cache containing
+    // a Replicated Region with one entry and concurrency checks enabled.
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    createCacheInVMs(lnPort, vm1);
+    Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
+
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    createCacheInVMs(nyPort, vm3);
+    Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
+    
+    LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
+
+    //Site 1
+    vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
+      false, 10, 1, false, false, null, true ));
+    
+    vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() ));
+    vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
+    vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
+    
+    //Site 2
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
+    
+    vm4.invoke(() -> WANTestBase.createCache( nyPort ));
+    vm4.invoke(() -> WANTestBase.createSender( "ny1", 1,
+      false, 10, 1, false, false, null, true ));
+    
+    vm4.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
+    vm4.invoke(() -> WANTestBase.startSender( "ny1" ));
+    vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
+    
+    Wait.pause(2000);
+    
+    // Perform a put in vm1
+    vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        
+        Region region = cache.getRegion("/repRegion");
+        region.put("testKey", "testValue1");
+        
+        assertEquals(1, region.size());
+      }
+    });
+
+    //wait for vm4 to have later timestamp before sending operation to vm1
+    Wait.pause(300); 
+
+    AsyncInvocation asynch = vm4.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds2 in vm4") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        Region region = cache.getRegion("/repRegion");
+        
+        region.put("testKey", "testValue2");
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    try {
+      asynch.join(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    //Wait for all local ds events be received by vm3.
+    Wait.pause(1000);
+
+    vm3.invoke(new CacheSerializableRunnable("Check dsid") {
+      
+      @Override
+      public void run2() throws CacheException {
+        Region region = cache.getRegion("repRegion");
+        
+        Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/
+            true); //commented while merging revision 43582
+        RegionEntry re = null;
+        if (entry instanceof EntrySnapshot) {
+          re = ((EntrySnapshot)entry).getRegionEntry();
+        } else if (entry instanceof NonTXEntry) {
+          re = ((NonTXEntry)entry).getRegionEntry();
+        }
+        VersionTag tag = re.getVersionStamp().asVersionTag();
+        assertEquals(2, tag.getDistributedSystemId());
+      }
+    });
+
+    // Check vm3 has latest timestamp from vm4.
+    long putAllTimeStampVm1 = (Long) vm4.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+    
+    long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+    
+    assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+  }
+  
+  /*
+   * For VM1 in ds 1. Used in testPutAllEventSequenceOnSerialGatewaySender.
+   */
+  public static long getVersionTimestampAfterPutAllOp() {
+    Region region = cache.getRegion("repRegion");
+    
+    while (!(region.containsKey("testKey") /*&& region.get("testKey").equals("testValue2") */)) {
+      Wait.pause(10);
+    }
+    assertEquals(1, region.size());
+
+    Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
+    RegionEntry re = null;
+    if (entry instanceof EntrySnapshot) {
+      re = ((EntrySnapshot)entry).getRegionEntry();
+    } else if (entry instanceof NonTXEntry) {
+      re = ((NonTXEntry)entry).getRegionEntry();
+    }
+    if (re != null) {
+      LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region));
+      
+      VersionTag tag = re.getVersionStamp().asVersionTag();
+      return tag.getVersionTimeStamp();
+    } else {
+      return -1;
+    }
+  }
+
+  /*
+   * For VM3 in ds 2.
+   */
+  public static long getVersionTimestampAfterOp() {
+    Region region = cache.getRegion("repRegion");
+    assertEquals(1, region.size());
+    
+    region.destroy("testKey");
+
+    Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
+    RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
+    LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region));
+    assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
+    
+    VersionTag tag = re.getVersionStamp().asVersionTag();
+    return tag.getVersionTimeStamp();
+  }
+
+  /*
+   * For VM 5 in ds 3.
+   */
+  public static void verifyTimestampAfterOp(long timestamp, int memberid) {
+    Region region = cache.getRegion("repRegion");
+    assertEquals(0, region.size());
+
+    Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
+    RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
+    assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
+    
+    VersionTag tag = re.getVersionStamp().asVersionTag();
+    assertEquals(timestamp, tag.getVersionTimeStamp());
+    assertEquals(memberid, tag.getDistributedSystemId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
new file mode 100644
index 0000000..76940ea
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import com.jayway.awaitility.Awaitility;
+import org.apache.geode.security.templates.SampleSecurityManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.security.AuthInitialize;
+import com.gemstone.gemfire.security.AuthenticationFailedException;
+import com.gemstone.gemfire.security.SecurityTestUtils;
+import com.gemstone.gemfire.security.generator.CredentialGenerator;
+import com.gemstone.gemfire.security.generator.DummyCredentialGenerator;
+import com.gemstone.gemfire.security.templates.UserPasswordAuthInit;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class NewWanAuthenticationDUnitTest extends WANTestBase {
+
+  public static final Logger logger = LogService.getLogger();
+
+  public static boolean isDifferentServerInGetCredentialCall = false;
+
+  /**
+   * Authentication test for new WAN with valid credentials. Although, nothing
+   * related to authentication has been changed in new WAN, this test case is
+   * added on request from QA for defect 44650.
+   */
+  @Test
+  public void testWanAuthValidCredentials() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    logger.info("Created locator on local site");
+
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    logger.info("Created locator on remote site");
+
+
+    CredentialGenerator gen = new DummyCredentialGenerator();
+    Properties extraProps = gen.getSystemProperties();
+
+    String clientauthenticator = gen.getAuthenticator();
+    String clientauthInit = gen.getAuthInit();
+
+    Properties credentials1 = gen.getValidCredentials(1);
+    if (extraProps != null) {
+      credentials1.putAll(extraProps);
+    }
+    Properties javaProps1 = gen.getJavaProperties();
+
+    // vm3's invalid credentials
+    Properties credentials2 = gen.getInvalidCredentials(1);
+    if (extraProps != null) {
+      credentials2.putAll(extraProps);
+    }
+    Properties javaProps2 = gen.getJavaProperties();
+
+    Properties props1 = buildProperties(clientauthenticator, clientauthInit,
+      null, credentials1, null);
+
+    // have vm 3 start a cache with invalid credentails
+    Properties props2 = buildProperties(clientauthenticator, clientauthInit,
+      null, credentials2, null);
+
+    vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+      props1, javaProps1, lnPort ));
+    logger.info("Created secured cache in vm2");
+
+    vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+      props2, javaProps2, nyPort ));
+    logger.info("Created secured cache in vm3");
+
+    vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+      false, 100, 10, false, false, null, true ));
+    logger.info("Created sender in vm2");
+
+    vm3.invoke(() -> createReceiverInSecuredCache());
+    logger.info("Created receiver in vm3");
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    logger.info("Created RR in vm2");
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", null, isOffHeap()  ));
+    logger.info("Created RR in vm3");
+
+    // this tests verifies that even though vm3 has invalid credentials, vm2 can still send data to vm3 because
+    // vm2 has valid credentials
+    vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+    vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1));
+    vm3.invoke(() -> {
+      Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR");
+      Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0));
+    });
+    logger.info("Done successfully.");
+  }
+
+  @Test
+  public void testWanIntegratedSecurityWithValidCredentials() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    logger.info("Created locator on local site");
+
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    logger.info("Created locator on remote site");
+
+
+    Properties props1 = buildSecurityProperties("admin", "secret");
+    Properties props2 = buildSecurityProperties("guest", "guest");
+
+    vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+      props1, null, lnPort ));
+    logger.info("Created secured cache in vm2");
+
+    vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+      props2, null, nyPort ));
+    logger.info("Created secured cache in vm3");
+
+    vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+      false, 100, 10, false, false, null, true ));
+    logger.info("Created sender in vm2");
+
+    vm3.invoke(() -> createReceiverInSecuredCache());
+    logger.info("Created receiver in vm3");
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    logger.info("Created RR in vm2");
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", null, isOffHeap()  ));
+    logger.info("Created RR in vm3");
+
+    vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+    vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+    vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1));
+    vm3.invoke(() -> {
+      Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR");
+      Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0));
+
+    });
+    logger.info("Done successfully.");
+  }
+
+  /**
+   * Test authentication with new WAN with invalid credentials. Although,
+   * nothing related to authentication has been changed in new WAN, this test
+   * case is added on request from QA for defect 44650.
+   */
+  @Test
+  public void testWanAuthInvalidCredentials() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    logger.info("Created locator on local site");
+
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    logger.info("Created locator on remote site");
+
+
+    CredentialGenerator gen = new DummyCredentialGenerator();
+    logger.info("Picked up credential: " + gen);
+
+    Properties extraProps = gen.getSystemProperties();
+
+    String clientauthenticator = gen.getAuthenticator();
+    String clientauthInit = gen.getAuthInit();
+
+    Properties credentials1 = gen.getInvalidCredentials(1);
+    if (extraProps != null) {
+      credentials1.putAll(extraProps);
+    }
+    Properties javaProps1 = gen.getJavaProperties();
+    Properties credentials2 = gen.getInvalidCredentials(2);
+    if (extraProps != null) {
+      credentials2.putAll(extraProps);
+    }
+    Properties javaProps2 = gen.getJavaProperties();
+
+    Properties props1 = buildProperties(clientauthenticator, clientauthInit,
+      null, credentials1, null);
+    Properties props2 = buildProperties(clientauthenticator, clientauthInit,
+      null, credentials2, null);
+
+    logger.info("Done building auth properties");
+
+    vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+      props1, javaProps1, lnPort ));
+    logger.info("Created secured cache in vm2");
+
+    vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+      props2, javaProps2, nyPort ));
+    logger.info("Created secured cache in vm3");
+
+    vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+      false, 100, 10, false, false, null, true ));
+    logger.info("Created sender in vm2");
+
+    vm3.invoke(() -> createReceiverInSecuredCache());
+    logger.info("Created receiver in vm3");
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    logger.info("Created RR in vm2");
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", null, isOffHeap()  ));
+    logger.info("Created RR in vm3");
+
+    try {
+      vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+      fail("Authentication Failed: While starting the sender, an exception should have been thrown");
+    } catch (Exception e) {
+      if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) {
+        fail("Authentication is not working as expected", e);
+      }
+    }
+  }
+
+  /**
+   * Test authentication with new WAN with invalid credentials. Although,
+   * nothing related to authentication has been changed in new WAN, this test
+   * case is added on request from QA for defect 44650.
+   */
+  @Test
+  public void testWanSecurityManagerWithInvalidCredentials() {
+    Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+    logger.info("Created locator on local site");
+
+    Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+    logger.info("Created locator on remote site");
+
+    Properties props1 = buildSecurityProperties("admin", "wrongPswd");
+    Properties props2 = buildSecurityProperties("guest", "wrongPswd");
+
+    logger.info("Done building auth properties");
+
+    vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+      props1, null, lnPort ));
+    logger.info("Created secured cache in vm2");
+
+    vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+      props2, null, nyPort ));
+    logger.info("Created secured cache in vm3");
+
+    vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+      false, 100, 10, false, false, null, true ));
+    logger.info("Created sender in vm2");
+
+    vm3.invoke(() -> createReceiverInSecuredCache());
+    logger.info("Created receiver in vm3");
+
+    vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", "ln", isOffHeap()  ));
+    logger.info("Created RR in vm2");
+    vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+      getTestMethodName() + "_RR", null, isOffHeap()  ));
+    logger.info("Created RR in vm3");
+
+    try {
+      vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+      fail("Authentication Failed: While starting the sender, an exception should have been thrown");
+    } catch (Exception e) {
+      if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) {
+        fail("Authentication is not working as expected", e);
+      }
+    }
+  }
+
+  private static Properties buildProperties(String clientauthenticator,
+                                            String clientAuthInit, String accessor, Properties extraAuthProps,
+                                            Properties extraAuthzProps) {
+    Properties authProps = new Properties();
+    if (clientauthenticator != null) {
+      authProps.setProperty(
+        SECURITY_CLIENT_AUTHENTICATOR,
+        clientauthenticator);
+    }
+    if (accessor != null) {
+      authProps.setProperty(SECURITY_CLIENT_ACCESSOR,
+        accessor);
+    }
+    if (clientAuthInit != null) {
+      authProps.setProperty(SECURITY_CLIENT_AUTH_INIT, clientAuthInit);
+    }
+    if (extraAuthProps != null) {
+      authProps.putAll(extraAuthProps);
+    }
+    if (extraAuthzProps != null) {
+      authProps.putAll(extraAuthzProps);
+    }
+    return authProps;
+  }
+
+  private static Properties buildSecurityProperties(String username, String password){
+    Properties props = new Properties();
+    props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName());
+    props.put("security-json", "org/apache/geode/security/templates/security.json");
+    props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName());
+    props.put("security-username", username);
+    props.put("security-password", password);
+    return props;
+  }
+
+  public static void createSecuredCache(Properties authProps, Object javaProps, Integer locPort) {
+    authProps.setProperty(MCAST_PORT, "0");
+    authProps.setProperty(LOCATORS, "localhost[" + locPort + "]");
+
+    logger.info("Set the server properties to: " + authProps);
+    logger.info("Set the java properties to: " + javaProps);
+
+    SecurityTestUtils tmpInstance = new SecurityTestUtils("temp");
+    DistributedSystem ds = tmpInstance.createSystem(authProps, (Properties)javaProps);
+    assertNotNull(ds);
+    assertTrue(ds.isConnected());
+    cache = CacheFactory.create(ds);
+    assertNotNull(cache);
+  }
+
+  public static class UserPasswdAI extends UserPasswordAuthInit {
+
+    public static AuthInitialize createAI() {
+      return new UserPasswdAI();
+    }
+
+    @Override
+    public Properties getCredentials(Properties props,
+                                     DistributedMember server, boolean isPeer)
+      throws AuthenticationFailedException {
+      boolean val = ( CacheFactory.getAnyInstance().getDistributedSystem().getDistributedMember().getProcessId() != server.getProcessId());
+      Assert.assertTrue(val, "getCredentials: Server should be different");
+      Properties p = super.getCredentials(props, server, isPeer);
+      if(val) {
+        isDifferentServerInGetCredentialCall = true;
+        CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting  isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall);
+      } else {
+        CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting22  isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall);
+      }
+      return p;
+    }
+  }
+
+  public static void verifyDifferentServerInGetCredentialCall(){
+    Assert.assertTrue(isDifferentServerInGetCredentialCall, "verifyDifferentServerInGetCredentialCall: Server should be different");
+    isDifferentServerInGetCredentialCall = false;
+  }
+
+  @Test
+  public void testWanAuthValidCredentialsWithServer() {
+    disconnectAllFromDS();
+    {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      logger.info("Created locator on local site");
+
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+      logger.info("Created locator on remote site");
+
+      DummyCredentialGenerator gen = new DummyCredentialGenerator();
+      gen.init();
+      Properties extraProps = gen.getSystemProperties();
+
+      String clientauthenticator = gen.getAuthenticator();
+      String clientauthInit = UserPasswdAI.class.getName() + ".createAI";
+
+      Properties credentials1 = gen.getValidCredentials(1);
+      if (extraProps != null) {
+        credentials1.putAll(extraProps);
+      }
+      Properties javaProps1 = gen.getJavaProperties();
+
+      Properties credentials2 = gen.getInvalidCredentials(2);
+      if (extraProps != null) {
+        credentials2.putAll(extraProps);
+      }
+      Properties javaProps2 = gen.getJavaProperties();
+
+      Properties props1 = buildProperties(clientauthenticator, clientauthInit,
+        null, credentials1, null);
+      Properties props2 = buildProperties(clientauthenticator, clientauthInit,
+        null, credentials2, null);
+
+      vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+        props1, javaProps1, lnPort ));
+      logger.info("Created secured cache in vm2");
+
+      vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+        props2, javaProps2, nyPort ));
+      logger.info("Created secured cache in vm3");
+
+      vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+      logger.info("Created sender in vm2");
+
+      vm3.invoke(() -> createReceiverInSecuredCache());
+      logger.info("Created receiver in vm3");
+
+      vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+      vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+      vm2.invoke(() -> verifyDifferentServerInGetCredentialCall());
+      vm3.invoke(() -> verifyDifferentServerInGetCredentialCall());
+    }
+  }
+
+  @Test
+  public void testWanSecurityManagerAuthValidCredentialsWithServer() {
+    disconnectAllFromDS();
+    {
+      Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+      logger.info("Created locator on local site");
+
+      Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+      logger.info("Created locator on remote site");
+
+      Properties props1 = buildSecurityProperties("admin", "secret");
+      Properties props2 = buildSecurityProperties("guest", "guest");
+
+      vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+        props1, null, lnPort ));
+      logger.info("Created secured cache in vm2");
+
+      vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+        props2, null, nyPort ));
+      logger.info("Created secured cache in vm3");
+
+      vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+        false, 100, 10, false, false, null, true ));
+      logger.info("Created sender in vm2");
+
+      vm3.invoke(() -> createReceiverInSecuredCache());
+      logger.info("Created receiver in vm3");
+
+      vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+      vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+      vm2.invoke(() -> verifyDifferentServerInGetCredentialCall());
+
+      // this would fail for now because for integrated security, we are not sending the receiver's credentials back
+      // to the sender. Because in the old security implementation, even though the receiver's credentials are sent back to the sender
+      // the sender is not checking it.
+      //vm3.invoke(() -> verifyDifferentServerInGetCredentialCall());
+    }
+  }
+}