You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:04 UTC
[16/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..9c6cbdd
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+import java.util.Set;
+
+/**
+ *
+ */
+@Category(DistributedTest.class)
+public class CommonParallelGatewaySenderDUnitTest extends WANTestBase {
+
+ @Test
+ public void testSameSenderWithNonColocatedRegions() throws Exception {
+ IgnoredException.addIgnoredException("cannot have the same parallel");
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+ try {
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+ fail("Expected IllegalStateException : cannot have the same parallel gateway sender");
+ }
+ catch (Exception e) {
+ if (!(e.getCause() instanceof IllegalStateException)
+ || !(e.getCause().getMessage()
+ .contains("cannot have the same parallel gateway sender id"))) {
+ Assert.fail("Expected IllegalStateException", e);
+ }
+ }
+ }
+
+ /**
+ * Simple scenario. Two regions attach the same PGS
+ * @throws Exception
+ * Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0
+ */
+ @Test
+ @Ignore("TODO")
+ public void testParallelPropagation() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1",
+ 1000 ));
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2",
+ 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR1", 1000 ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR2", 1000 ));
+ }
+
+ /**
+ * The PGS is persistence enabled but not the Regions
+ * Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0
+ */
+ @Test
+ @Ignore("TODO")
+ public void testParallelPropagationPersistenceEnabled() throws Exception {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+ vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
+ true, 100, 10, false, true, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
+
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
+ //before doing any puts, let the senders be running in order to ensure that
+ //not a single event will be lost
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1",
+ 1000 ));
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2",
+ 1000 ));
+
+ //verify all buckets drained on all sender nodes.
+ vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+ vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR1", 1000 ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName() + "_PR2", 1000 ));
+ }
+
+
+ /**
+ * Enable persistence for GatewaySender.
+ * Pause the sender and do some puts in local region.
+ * Close the local site and rebuild the region and sender from disk store.
+ * Dispatcher should not start dispatching events recovered from persistent sender.
+ * Check if the remote site receives all the events.
+ * Below test is disabled intentionally
+ 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
+ 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
+ ParallelGatewaySenderQueue#convertPathToName
+ 3> We have to enabled it in next release
+ 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
+ and version prior to 8.0
+ */
+ @Test
+ @Ignore("TODO")
+ public void testPRWithGatewaySenderPersistenceEnabled_Restart() {
+ //create locator on local site
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ //create locator on remote site
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+
+ //create receiver on remote site
+ createCacheInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
+
+ //create cache in local site
+ createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
+
+ //create senders with disk store
+ String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+ String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
+
+ LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", null, 1, 100, isOffHeap() ));
+
+ //create PR on remote site
+ vm2.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", null, 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", null, 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+
+ //create PR on local site
+ vm4.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+
+
+ //start the senders on local site
+ startSenderInVMs("ln", vm4, vm5, vm6, vm7);
+
+ //wait for senders to become running
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ //pause the senders
+ vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
+ vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
+
+ //start puts in region on local site
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR1", 3000 ));
+ vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR2", 5000 ));
+ LogWriterUtils.getLogWriter().info("Completed puts in the region");
+
+ //--------------------close and rebuild local site -------------------------------------------------
+ //kill the senders
+ vm4.invoke(() -> WANTestBase.killSender());
+ vm5.invoke(() -> WANTestBase.killSender());
+ vm6.invoke(() -> WANTestBase.killSender());
+ vm7.invoke(() -> WANTestBase.killSender());
+
+ LogWriterUtils.getLogWriter().info("Killed all the senders.");
+
+ //restart the vm
+ vm4.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm5.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm6.invoke(() -> WANTestBase.createCache( lnPort ));
+ vm7.invoke(() -> WANTestBase.createCache( lnPort ));
+
+ LogWriterUtils.getLogWriter().info("Created back the cache");
+
+ //create senders with disk store
+ vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
+
+ LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
+ //create PR on local site
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+ AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+ inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+ inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+ inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion(
+ getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
+
+ try {
+ inv1.join();
+ inv2.join();
+ inv3.join();
+ inv4.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ fail();
+ }
+
+ LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
+
+ //start the senders in async mode. This will ensure that the
+ //node of shadow PR that went down last will come up first
+ startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
+
+ LogWriterUtils.getLogWriter().info("Waiting for senders running.");
+ //wait for senders running
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ LogWriterUtils.getLogWriter().info("All the senders are now running...");
+
+ //----------------------------------------------------------------------------------------------------
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName()+"PR1", 3000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName()+"PR1", 3000 ));
+
+ vm2.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName()+"PR2", 5000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(
+ getTestMethodName()+"PR2", 5000 ));
+ }
+
+ public static void validateParallelSenderQueueAllBucketsDrained(final String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ ConcurrentParallelGatewaySenderQueue regionQueue = (ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0];
+
+ Set<PartitionedRegion> shadowPRs = (Set<PartitionedRegion>)regionQueue.getRegions();
+
+ for(PartitionedRegion shadowPR: shadowPRs) {
+ Set<BucketRegion> buckets = shadowPR.getDataStore().getAllLocalBucketRegions();
+
+ for (final BucketRegion bucket : buckets) {
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ if (bucket.keySet().size() == 0) {
+ LogWriterUtils.getLogWriter().info("Bucket " + bucket.getId() + " is empty");
+ return true;
+ }
+ return false;
+ }
+
+ public String description() {
+ return "Expected bucket entries for bucket: " + bucket.getId() + " is: 0 but actual entries: "
+ + bucket.keySet().size() + " This bucket isPrimary: " + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet();
+ }
+ };
+ Wait.waitForCriterion(wc, 180000, 50, true);
+
+ }//for loop ends
+ }
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
new file mode 100644
index 0000000..2fb77ff
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@SuppressWarnings("serial")
+@Category(DistributedTest.class)
+public class CommonParallelGatewaySenderOffHeapDUnitTest extends
+ CommonParallelGatewaySenderDUnitTest {
+
+ public CommonParallelGatewaySenderOffHeapDUnitTest() {
+ super();
+ }
+
+ @Override
+ public boolean isOffHeap() {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
new file mode 100644
index 0000000..8b8c624
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
@@ -0,0 +1,532 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import org.junit.experimental.categories.Category;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
+import com.gemstone.gemfire.internal.cache.EntrySnapshot;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+import com.gemstone.gemfire.internal.cache.RegionEntry;
+import com.gemstone.gemfire.internal.cache.Token.Tombstone;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.Wait;
+
+/**
+ *
+ * Test verifies that version tag for destroyed entry is propagated back to
+ * origin distributed system if the version tag is applied and replaces old
+ * version information in destination distributed system.
+ *
+ * Version tag information which is relevant between multiple distributed
+ * systems consistency check is basically dsid and timestamp.
+ */
+@Category(DistributedTest.class)
+public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase {
+
+ public NewWANConcurrencyCheckForDestroyDUnitTest() {
+ super();
+ }
+
+ @Test
+ public void testVersionTagTimestampForDestroy() {
+
+
+ // create three distributed systems with each having a cache containing
+ // a Replicated Region with one entry and concurrency checks enabled.
+
+ // Site 2 and Site 3 only know about Site 1 but Site 1 knows about both
+ // Site 2 and Site 3.
+
+ // Site 1
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ createCacheInVMs(lnPort, vm1);
+ Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
+
+ //Site 2
+ Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ createCacheInVMs(nyPort, vm3);
+ Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
+
+ //Site 3
+ Integer tkPort = (Integer)vm4.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
+ createCacheInVMs(tkPort, vm5);
+ Integer tkRecPort = (Integer) vm5.invoke(() -> WANTestBase.createReceiver());
+
+ LogWriterUtils.getLogWriter().info("Created locators and receivers in 3 distributed systems");
+
+ //Site 1
+ vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ true, 10, 1, false, false, null, true ));
+ vm1.invoke(() -> WANTestBase.createSender( "ln2", 3,
+ true, 10, 1, false, false, null, true ));
+
+ vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1,ln2", 0, 1, isOffHeap() ));
+ vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
+ vm1.invoke(() -> WANTestBase.startSender( "ln2" ));
+ vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
+ vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln2" ));
+
+ //Site 2
+ vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
+ true, 10, 1, false, false, null, true ));
+
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
+ vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
+
+ //Site 3 which only knows about Site 1.
+ vm5.invoke(() -> WANTestBase.createSender( "tk1", 1,
+ true, 10, 1, false, false, null, true ));
+
+ vm5.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "tk1", 0, 1, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.startSender( "tk1" ));
+ vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "tk1" ));
+
+ Wait.pause(2000);
+
+ // Perform a put in vm1
+ vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") {
+
+ @Override
+ public void run2() throws CacheException {
+ assertNotNull(cache);
+
+ Region region = cache.getRegion("/repRegion");
+ region.put("testKey", "testValue");
+
+ assertEquals(1, region.size());
+ }
+ });
+
+ //wait for vm1 to propagate put to vm3 and vm5
+ Wait.pause(2000);
+
+ long destroyTimeStamp = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterOp());
+
+ //wait for vm1 to propagate destroyed entry's new version tag to vm5
+ Wait.pause(2000);
+
+ vm5.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.verifyTimestampAfterOp(destroyTimeStamp, 1 /* ds 3 receives gateway event only from ds 1*/));
+ }
+
+ /**
+ * Test creates two sites and one Replicated Region on each with Serial
+ * GatewaySender on each. Test checks for sequence of events being sent from
+ * site1 to site2 for PUTALL and PUT and finally checks for final timestamp in
+ * version for RegionEntry with key "testKey". If timestamp on both site is
+ * same that means events were transferred in correct sequence.
+ */
+ @Test
+ public void testPutAllEventSequenceOnSerialGatewaySenderWithRR() {
+
+ // create two distributed systems with each having a cache containing
+ // a Replicated Region with one entry and concurrency checks enabled.
+
+ // Site 1
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ vm1.invoke(() -> WANTestBase.createCache(lnPort));
+ Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
+
+ //Site 2
+ Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ vm3.invoke(() -> WANTestBase.createCache(nyPort));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
+
+ LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
+
+ //Site 1
+ vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ false, 10, 1, false, false, null, true ));
+
+ vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() ));
+ vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
+ vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
+
+ //Site 2
+ vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
+ false, 10, 1, false, false, null, true ));
+
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
+ vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
+
+ Wait.pause(2000);
+
+ // Perform a put in vm1
+ AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
+
+ @Override
+ public void run2() throws CacheException {
+ assertNotNull(cache);
+ // Test hook to make put wait after RE lock is released but before Gateway events are sent.
+ DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000;
+
+ Region region = cache.getRegion("/repRegion");
+ Map testMap = new HashMap();
+ testMap.put("testKey", "testValue1");
+ region.putAll(testMap);
+
+ assertEquals(1, region.size());
+ assertEquals("testValue2", region.get("testKey"));
+ }
+ });
+
+ //wait for vm1 to propagate put to vm3
+ Wait.pause(1000);
+
+ AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
+
+ @Override
+ public void run2() throws CacheException {
+ assertNotNull(cache);
+ Region region = cache.getRegion("/repRegion");
+
+ while (!region.containsKey("testKey")) {
+ Wait.pause(10);
+ }
+ // Test hook to make put wait after RE lock is released but before Gateway events are sent.
+ DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
+
+ region.put("testKey", "testValue2");
+
+ assertEquals(1, region.size());
+ assertEquals("testValue2", region.get("testKey"));
+ }
+ });
+
+ try {
+ asynch1.join(5000);
+ asynch2.join(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") {
+
+ @Override
+ public void run2() throws CacheException {
+ DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
+ }
+ });
+ }
+
+ //Wait for all Gateway events be received by vm3.
+ Wait.pause(1000);
+
+ long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+
+ long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+
+ assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+ }
+
+/**
+ * This is similar to above test but for PartitionedRegion.
+ */
+ @Test
+ public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() {
+
+ // create two distributed systems with each having a cache containing
+ // a Replicated Region with one entry and concurrency checks enabled.
+
+ // Site 1
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ createCacheInVMs(lnPort, vm1);
+ Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
+
+ //Site 2
+ Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ createCacheInVMs(nyPort, vm3);
+ Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
+
+ LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
+
+ //Site 1
+ vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ false, 10, 1, false, false, null, true ));
+
+ vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1", 0, 1, isOffHeap() ));
+ vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
+ vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
+
+ //Site 2
+ vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
+ false, 10, 1, false, false, null, true ));
+
+ vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
+ vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
+
+ Wait.pause(2000);
+
+ // Perform a put in vm1
+ AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
+
+ @Override
+ public void run2() throws CacheException {
+ assertNotNull(cache);
+ // Test hook to make put wait after RE lock is released but before Gateway events are sent.
+ DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000;
+
+ Region region = cache.getRegion("/repRegion");
+ Map testMap = new HashMap();
+ testMap.put("testKey", "testValue1");
+ region.putAll(testMap);
+
+ assertEquals(1, region.size());
+ assertEquals("testValue2", region.get("testKey"));
+ }
+ });
+
+ //wait for vm1 to propagate put to vm3
+ Wait.pause(1000);
+
+ AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
+
+ @Override
+ public void run2() throws CacheException {
+ assertNotNull(cache);
+ Region region = cache.getRegion("/repRegion");
+
+ while (!region.containsKey("testKey")) {
+ Wait.pause(10);
+ }
+ // Test hook to make put wait after RE lock is released but before Gateway events are sent.
+ DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
+
+ region.put("testKey", "testValue2");
+
+ assertEquals(1, region.size());
+ assertEquals("testValue2", region.get("testKey"));
+ }
+ });
+
+ try {
+ asynch1.join(5000);
+ asynch2.join(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") {
+
+ @Override
+ public void run2() throws CacheException {
+ DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
+ }
+ });
+ }
+
+ //Wait for all Gateway events be received by vm3.
+ Wait.pause(1000);
+
+ long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+
+ long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+
+ assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+ }
+
+ /**
+ * Tests if conflict checks are happening based on DSID and timestamp even if
+ * version tag is generated in local distributed system.
+ */
+ @Test
+ public void testConflictChecksBasedOnDsidAndTimeStamp() {
+
+
+ // create two distributed systems with each having a cache containing
+ // a Replicated Region with one entry and concurrency checks enabled.
+
+ // Site 1
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ createCacheInVMs(lnPort, vm1);
+ Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
+
+ //Site 2
+ Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ createCacheInVMs(nyPort, vm3);
+ Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
+
+ LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
+
+ //Site 1
+ vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
+ false, 10, 1, false, false, null, true ));
+
+ vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() ));
+ vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
+ vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
+
+ //Site 2
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
+
+ vm4.invoke(() -> WANTestBase.createCache( nyPort ));
+ vm4.invoke(() -> WANTestBase.createSender( "ny1", 1,
+ false, 10, 1, false, false, null, true ));
+
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
+ vm4.invoke(() -> WANTestBase.startSender( "ny1" ));
+ vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
+
+ Wait.pause(2000);
+
+ // Perform a put in vm1
+ vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") {
+
+ @Override
+ public void run2() throws CacheException {
+ assertNotNull(cache);
+
+ Region region = cache.getRegion("/repRegion");
+ region.put("testKey", "testValue1");
+
+ assertEquals(1, region.size());
+ }
+ });
+
+ //wait for vm4 to have later timestamp before sending operation to vm1
+ Wait.pause(300);
+
+ AsyncInvocation asynch = vm4.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds2 in vm4") {
+
+ @Override
+ public void run2() throws CacheException {
+ assertNotNull(cache);
+ Region region = cache.getRegion("/repRegion");
+
+ region.put("testKey", "testValue2");
+
+ assertEquals(1, region.size());
+ assertEquals("testValue2", region.get("testKey"));
+ }
+ });
+
+ try {
+ asynch.join(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ //Wait for all local ds events be received by vm3.
+ Wait.pause(1000);
+
+ vm3.invoke(new CacheSerializableRunnable("Check dsid") {
+
+ @Override
+ public void run2() throws CacheException {
+ Region region = cache.getRegion("repRegion");
+
+ Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/
+ true); //commented while merging revision 43582
+ RegionEntry re = null;
+ if (entry instanceof EntrySnapshot) {
+ re = ((EntrySnapshot)entry).getRegionEntry();
+ } else if (entry instanceof NonTXEntry) {
+ re = ((NonTXEntry)entry).getRegionEntry();
+ }
+ VersionTag tag = re.getVersionStamp().asVersionTag();
+ assertEquals(2, tag.getDistributedSystemId());
+ }
+ });
+
+ // Check vm3 has latest timestamp from vm4.
+ long putAllTimeStampVm1 = (Long) vm4.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+
+ long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
+
+ assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+ }
+
+ /*
+ * For VM1 in ds 1. Used in testPutAllEventSequenceOnSerialGatewaySender.
+ */
+ public static long getVersionTimestampAfterPutAllOp() {
+ Region region = cache.getRegion("repRegion");
+
+ while (!(region.containsKey("testKey") /*&& region.get("testKey").equals("testValue2") */)) {
+ Wait.pause(10);
+ }
+ assertEquals(1, region.size());
+
+ Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
+ RegionEntry re = null;
+ if (entry instanceof EntrySnapshot) {
+ re = ((EntrySnapshot)entry).getRegionEntry();
+ } else if (entry instanceof NonTXEntry) {
+ re = ((NonTXEntry)entry).getRegionEntry();
+ }
+ if (re != null) {
+ LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region));
+
+ VersionTag tag = re.getVersionStamp().asVersionTag();
+ return tag.getVersionTimeStamp();
+ } else {
+ return -1;
+ }
+ }
+
+ /*
+ * For VM3 in ds 2.
+ */
+ public static long getVersionTimestampAfterOp() {
+ Region region = cache.getRegion("repRegion");
+ assertEquals(1, region.size());
+
+ region.destroy("testKey");
+
+ Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
+ RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
+ LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region));
+ assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
+
+ VersionTag tag = re.getVersionStamp().asVersionTag();
+ return tag.getVersionTimeStamp();
+ }
+
+ /*
+ * For VM 5 in ds 3.
+ */
+ public static void verifyTimestampAfterOp(long timestamp, int memberid) {
+ Region region = cache.getRegion("repRegion");
+ assertEquals(0, region.size());
+
+ Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
+ RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
+ assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
+
+ VersionTag tag = re.getVersionStamp().asVersionTag();
+ assertEquals(timestamp, tag.getVersionTimeStamp());
+ assertEquals(memberid, tag.getDistributedSystemId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
new file mode 100644
index 0000000..76940ea
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static com.gemstone.gemfire.test.dunit.Assert.*;
+
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import com.jayway.awaitility.Awaitility;
+import org.apache.geode.security.templates.SampleSecurityManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.security.AuthInitialize;
+import com.gemstone.gemfire.security.AuthenticationFailedException;
+import com.gemstone.gemfire.security.SecurityTestUtils;
+import com.gemstone.gemfire.security.generator.CredentialGenerator;
+import com.gemstone.gemfire.security.generator.DummyCredentialGenerator;
+import com.gemstone.gemfire.security.templates.UserPasswordAuthInit;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class NewWanAuthenticationDUnitTest extends WANTestBase {
+
+ public static final Logger logger = LogService.getLogger();
+
+ public static boolean isDifferentServerInGetCredentialCall = false;
+
+ /**
+ * Authentication test for new WAN with valid credentials. Although, nothing
+ * related to authentication has been changed in new WAN, this test case is
+ * added on request from QA for defect 44650.
+ */
+ @Test
+ public void testWanAuthValidCredentials() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ logger.info("Created locator on local site");
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ logger.info("Created locator on remote site");
+
+
+ CredentialGenerator gen = new DummyCredentialGenerator();
+ Properties extraProps = gen.getSystemProperties();
+
+ String clientauthenticator = gen.getAuthenticator();
+ String clientauthInit = gen.getAuthInit();
+
+ Properties credentials1 = gen.getValidCredentials(1);
+ if (extraProps != null) {
+ credentials1.putAll(extraProps);
+ }
+ Properties javaProps1 = gen.getJavaProperties();
+
+ // vm3's invalid credentials
+ Properties credentials2 = gen.getInvalidCredentials(1);
+ if (extraProps != null) {
+ credentials2.putAll(extraProps);
+ }
+ Properties javaProps2 = gen.getJavaProperties();
+
+ Properties props1 = buildProperties(clientauthenticator, clientauthInit,
+ null, credentials1, null);
+
+ // have vm 3 start a cache with invalid credentails
+ Properties props2 = buildProperties(clientauthenticator, clientauthInit,
+ null, credentials2, null);
+
+ vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props1, javaProps1, lnPort ));
+ logger.info("Created secured cache in vm2");
+
+ vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props2, javaProps2, nyPort ));
+ logger.info("Created secured cache in vm3");
+
+ vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, false, null, true ));
+ logger.info("Created sender in vm2");
+
+ vm3.invoke(() -> createReceiverInSecuredCache());
+ logger.info("Created receiver in vm3");
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ logger.info("Created RR in vm2");
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ logger.info("Created RR in vm3");
+
+ // this tests verifies that even though vm3 has invalid credentials, vm2 can still send data to vm3 because
+ // vm2 has valid credentials
+ vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+ vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1));
+ vm3.invoke(() -> {
+ Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR");
+ Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0));
+ });
+ logger.info("Done successfully.");
+ }
+
+ @Test
+ public void testWanIntegratedSecurityWithValidCredentials() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ logger.info("Created locator on local site");
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ logger.info("Created locator on remote site");
+
+
+ Properties props1 = buildSecurityProperties("admin", "secret");
+ Properties props2 = buildSecurityProperties("guest", "guest");
+
+ vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props1, null, lnPort ));
+ logger.info("Created secured cache in vm2");
+
+ vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props2, null, nyPort ));
+ logger.info("Created secured cache in vm3");
+
+ vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, false, null, true ));
+ logger.info("Created sender in vm2");
+
+ vm3.invoke(() -> createReceiverInSecuredCache());
+ logger.info("Created receiver in vm3");
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ logger.info("Created RR in vm2");
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ logger.info("Created RR in vm3");
+
+ vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+ vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+ vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1));
+ vm3.invoke(() -> {
+ Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR");
+ Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0));
+
+ });
+ logger.info("Done successfully.");
+ }
+
+ /**
+ * Test authentication with new WAN with invalid credentials. Although,
+ * nothing related to authentication has been changed in new WAN, this test
+ * case is added on request from QA for defect 44650.
+ */
+ @Test
+ public void testWanAuthInvalidCredentials() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ logger.info("Created locator on local site");
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ logger.info("Created locator on remote site");
+
+
+ CredentialGenerator gen = new DummyCredentialGenerator();
+ logger.info("Picked up credential: " + gen);
+
+ Properties extraProps = gen.getSystemProperties();
+
+ String clientauthenticator = gen.getAuthenticator();
+ String clientauthInit = gen.getAuthInit();
+
+ Properties credentials1 = gen.getInvalidCredentials(1);
+ if (extraProps != null) {
+ credentials1.putAll(extraProps);
+ }
+ Properties javaProps1 = gen.getJavaProperties();
+ Properties credentials2 = gen.getInvalidCredentials(2);
+ if (extraProps != null) {
+ credentials2.putAll(extraProps);
+ }
+ Properties javaProps2 = gen.getJavaProperties();
+
+ Properties props1 = buildProperties(clientauthenticator, clientauthInit,
+ null, credentials1, null);
+ Properties props2 = buildProperties(clientauthenticator, clientauthInit,
+ null, credentials2, null);
+
+ logger.info("Done building auth properties");
+
+ vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props1, javaProps1, lnPort ));
+ logger.info("Created secured cache in vm2");
+
+ vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props2, javaProps2, nyPort ));
+ logger.info("Created secured cache in vm3");
+
+ vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, false, null, true ));
+ logger.info("Created sender in vm2");
+
+ vm3.invoke(() -> createReceiverInSecuredCache());
+ logger.info("Created receiver in vm3");
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ logger.info("Created RR in vm2");
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ logger.info("Created RR in vm3");
+
+ try {
+ vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+ fail("Authentication Failed: While starting the sender, an exception should have been thrown");
+ } catch (Exception e) {
+ if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) {
+ fail("Authentication is not working as expected", e);
+ }
+ }
+ }
+
+ /**
+ * Test authentication with new WAN with invalid credentials. Although,
+ * nothing related to authentication has been changed in new WAN, this test
+ * case is added on request from QA for defect 44650.
+ */
+ @Test
+ public void testWanSecurityManagerWithInvalidCredentials() {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ logger.info("Created locator on local site");
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ logger.info("Created locator on remote site");
+
+ Properties props1 = buildSecurityProperties("admin", "wrongPswd");
+ Properties props2 = buildSecurityProperties("guest", "wrongPswd");
+
+ logger.info("Done building auth properties");
+
+ vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props1, null, lnPort ));
+ logger.info("Created secured cache in vm2");
+
+ vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props2, null, nyPort ));
+ logger.info("Created secured cache in vm3");
+
+ vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, false, null, true ));
+ logger.info("Created sender in vm2");
+
+ vm3.invoke(() -> createReceiverInSecuredCache());
+ logger.info("Created receiver in vm3");
+
+ vm2.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", "ln", isOffHeap() ));
+ logger.info("Created RR in vm2");
+ vm3.invoke(() -> WANTestBase.createReplicatedRegion(
+ getTestMethodName() + "_RR", null, isOffHeap() ));
+ logger.info("Created RR in vm3");
+
+ try {
+ vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+ fail("Authentication Failed: While starting the sender, an exception should have been thrown");
+ } catch (Exception e) {
+ if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) {
+ fail("Authentication is not working as expected", e);
+ }
+ }
+ }
+
+ private static Properties buildProperties(String clientauthenticator,
+ String clientAuthInit, String accessor, Properties extraAuthProps,
+ Properties extraAuthzProps) {
+ Properties authProps = new Properties();
+ if (clientauthenticator != null) {
+ authProps.setProperty(
+ SECURITY_CLIENT_AUTHENTICATOR,
+ clientauthenticator);
+ }
+ if (accessor != null) {
+ authProps.setProperty(SECURITY_CLIENT_ACCESSOR,
+ accessor);
+ }
+ if (clientAuthInit != null) {
+ authProps.setProperty(SECURITY_CLIENT_AUTH_INIT, clientAuthInit);
+ }
+ if (extraAuthProps != null) {
+ authProps.putAll(extraAuthProps);
+ }
+ if (extraAuthzProps != null) {
+ authProps.putAll(extraAuthzProps);
+ }
+ return authProps;
+ }
+
+ private static Properties buildSecurityProperties(String username, String password){
+ Properties props = new Properties();
+ props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName());
+ props.put("security-json", "org/apache/geode/security/templates/security.json");
+ props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName());
+ props.put("security-username", username);
+ props.put("security-password", password);
+ return props;
+ }
+
+ public static void createSecuredCache(Properties authProps, Object javaProps, Integer locPort) {
+ authProps.setProperty(MCAST_PORT, "0");
+ authProps.setProperty(LOCATORS, "localhost[" + locPort + "]");
+
+ logger.info("Set the server properties to: " + authProps);
+ logger.info("Set the java properties to: " + javaProps);
+
+ SecurityTestUtils tmpInstance = new SecurityTestUtils("temp");
+ DistributedSystem ds = tmpInstance.createSystem(authProps, (Properties)javaProps);
+ assertNotNull(ds);
+ assertTrue(ds.isConnected());
+ cache = CacheFactory.create(ds);
+ assertNotNull(cache);
+ }
+
+ public static class UserPasswdAI extends UserPasswordAuthInit {
+
+ public static AuthInitialize createAI() {
+ return new UserPasswdAI();
+ }
+
+ @Override
+ public Properties getCredentials(Properties props,
+ DistributedMember server, boolean isPeer)
+ throws AuthenticationFailedException {
+ boolean val = ( CacheFactory.getAnyInstance().getDistributedSystem().getDistributedMember().getProcessId() != server.getProcessId());
+ Assert.assertTrue(val, "getCredentials: Server should be different");
+ Properties p = super.getCredentials(props, server, isPeer);
+ if(val) {
+ isDifferentServerInGetCredentialCall = true;
+ CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall);
+ } else {
+ CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting22 isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall);
+ }
+ return p;
+ }
+ }
+
+ public static void verifyDifferentServerInGetCredentialCall(){
+ Assert.assertTrue(isDifferentServerInGetCredentialCall, "verifyDifferentServerInGetCredentialCall: Server should be different");
+ isDifferentServerInGetCredentialCall = false;
+ }
+
+ @Test
+ public void testWanAuthValidCredentialsWithServer() {
+ disconnectAllFromDS();
+ {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ logger.info("Created locator on local site");
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ logger.info("Created locator on remote site");
+
+ DummyCredentialGenerator gen = new DummyCredentialGenerator();
+ gen.init();
+ Properties extraProps = gen.getSystemProperties();
+
+ String clientauthenticator = gen.getAuthenticator();
+ String clientauthInit = UserPasswdAI.class.getName() + ".createAI";
+
+ Properties credentials1 = gen.getValidCredentials(1);
+ if (extraProps != null) {
+ credentials1.putAll(extraProps);
+ }
+ Properties javaProps1 = gen.getJavaProperties();
+
+ Properties credentials2 = gen.getInvalidCredentials(2);
+ if (extraProps != null) {
+ credentials2.putAll(extraProps);
+ }
+ Properties javaProps2 = gen.getJavaProperties();
+
+ Properties props1 = buildProperties(clientauthenticator, clientauthInit,
+ null, credentials1, null);
+ Properties props2 = buildProperties(clientauthenticator, clientauthInit,
+ null, credentials2, null);
+
+ vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props1, javaProps1, lnPort ));
+ logger.info("Created secured cache in vm2");
+
+ vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props2, javaProps2, nyPort ));
+ logger.info("Created secured cache in vm3");
+
+ vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, false, null, true ));
+ logger.info("Created sender in vm2");
+
+ vm3.invoke(() -> createReceiverInSecuredCache());
+ logger.info("Created receiver in vm3");
+
+ vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+ vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ vm2.invoke(() -> verifyDifferentServerInGetCredentialCall());
+ vm3.invoke(() -> verifyDifferentServerInGetCredentialCall());
+ }
+ }
+
+ @Test
+ public void testWanSecurityManagerAuthValidCredentialsWithServer() {
+ disconnectAllFromDS();
+ {
+ Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
+ logger.info("Created locator on local site");
+
+ Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
+ logger.info("Created locator on remote site");
+
+ Properties props1 = buildSecurityProperties("admin", "secret");
+ Properties props2 = buildSecurityProperties("guest", "guest");
+
+ vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props1, null, lnPort ));
+ logger.info("Created secured cache in vm2");
+
+ vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
+ props2, null, nyPort ));
+ logger.info("Created secured cache in vm3");
+
+ vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
+ false, 100, 10, false, false, null, true ));
+ logger.info("Created sender in vm2");
+
+ vm3.invoke(() -> createReceiverInSecuredCache());
+ logger.info("Created receiver in vm3");
+
+ vm2.invoke(() -> WANTestBase.startSender( "ln" ));
+ vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
+
+ vm2.invoke(() -> verifyDifferentServerInGetCredentialCall());
+
+ // this would fail for now because for integrated security, we are not sending the receiver's credentials back
+ // to the sender. Because in the old security implementation, even though the receiver's credentials are sent back to the sender
+ // the sender is not checking it.
+ //vm3.invoke(() -> verifyDifferentServerInGetCredentialCall());
+ }
+ }
+}