You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:24 UTC
[36/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
deleted file mode 100644
index 9c6cbdd..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.misc;
-
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.RegionQueue;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
-import com.gemstone.gemfire.test.dunit.Assert;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
-
-import java.util.Set;
-
-/**
- *
- */
-@Category(DistributedTest.class)
-public class CommonParallelGatewaySenderDUnitTest extends WANTestBase {
-
- @Test
- public void testSameSenderWithNonColocatedRegions() throws Exception {
- IgnoredException.addIgnoredException("cannot have the same parallel");
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
- try {
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
- fail("Expected IllegalStateException : cannot have the same parallel gateway sender");
- }
- catch (Exception e) {
- if (!(e.getCause() instanceof IllegalStateException)
- || !(e.getCause().getMessage()
- .contains("cannot have the same parallel gateway sender id"))) {
- Assert.fail("Expected IllegalStateException", e);
- }
- }
- }
-
- /**
- * Simple scenario. Two regions attach the same PGS
- * @throws Exception
- * Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0
- */
- @Test
- @Ignore("TODO")
- public void testParallelPropagation() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1",
- 1000 ));
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2",
- 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR1", 1000 ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR2", 1000 ));
- }
-
- /**
- * The PGS is persistence enabled but not the Regions
- * Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0
- */
- @Test
- @Ignore("TODO")
- public void testParallelPropagationPersistenceEnabled() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, true, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, true, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, true, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, true, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", "ln", 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR1", null, 1, 100, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR2", null, 1, 100, isOffHeap() ));
- //before doing any puts, let the senders be running in order to ensure that
- //not a single event will be lost
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR1",
- 1000 ));
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR2",
- 1000 ));
-
- //verify all buckets drained on all sender nodes.
- vm4.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm5.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm6.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
- vm7.invoke(() -> CommonParallelGatewaySenderDUnitTest.validateParallelSenderQueueAllBucketsDrained("ln"));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR1", 1000 ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR2", 1000 ));
- }
-
-
- /**
- * Enable persistence for GatewaySender.
- * Pause the sender and do some puts in local region.
- * Close the local site and rebuild the region and sender from disk store.
- * Dispatcher should not start dispatching events recovered from persistent sender.
- * Check if the remote site receives all the events.
- * Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0
- */
- @Test
- @Ignore("TODO")
- public void testPRWithGatewaySenderPersistenceEnabled_Restart() {
- //create locator on local site
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- //create locator on remote site
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- //create receiver on remote site
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- //create cache in local site
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- //create senders with disk store
- String diskStore1 = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
- String diskStore2 = (String) vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
- String diskStore3 = (String) vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
- String diskStore4 = (String) vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, null, true ));
-
- LogWriterUtils.getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4);
-
- //create PR on remote site
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", null, 1, 100, isOffHeap() ));
-
- //create PR on remote site
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", null, 1, 100, isOffHeap() ));
-
- //create PR on local site
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
-
- //create PR on local site
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
-
-
- //start the senders on local site
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- //wait for senders to become running
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- //pause the senders
- vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm5.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm6.invoke(() -> WANTestBase.pauseSender( "ln" ));
- vm7.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- //start puts in region on local site
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR1", 3000 ));
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName()+"PR2", 5000 ));
- LogWriterUtils.getLogWriter().info("Completed puts in the region");
-
- //--------------------close and rebuild local site -------------------------------------------------
- //kill the senders
- vm4.invoke(() -> WANTestBase.killSender());
- vm5.invoke(() -> WANTestBase.killSender());
- vm6.invoke(() -> WANTestBase.killSender());
- vm7.invoke(() -> WANTestBase.killSender());
-
- LogWriterUtils.getLogWriter().info("Killed all the senders.");
-
- //restart the vm
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
- vm6.invoke(() -> WANTestBase.createCache( lnPort ));
- vm7.invoke(() -> WANTestBase.createCache( lnPort ));
-
- LogWriterUtils.getLogWriter().info("Created back the cache");
-
- //create senders with disk store
- vm4.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore1, true ));
- vm5.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore2, true ));
- vm6.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore3, true ));
- vm7.invoke(() -> WANTestBase.createSenderWithDiskStore( "ln", 2, true, 100, 10, false, true, null, diskStore4, true ));
-
- LogWriterUtils.getLogWriter().info("Created the senders back from the disk store.");
- //create PR on local site
- AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
- AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
- AsyncInvocation inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
- AsyncInvocation inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR1", "ln", 1, 100, isOffHeap() ));
-
- try {
- inv1.join();
- inv2.join();
- inv3.join();
- inv4.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail();
- }
-
- inv1 = vm4.invokeAsync(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
- inv2 = vm5.invokeAsync(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
- inv3 = vm6.invokeAsync(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
- inv4 = vm7.invokeAsync(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName()+"PR2", "ln", 1, 100, isOffHeap() ));
-
- try {
- inv1.join();
- inv2.join();
- inv3.join();
- inv4.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail();
- }
-
- LogWriterUtils.getLogWriter().info("Created back the partitioned regions");
-
- //start the senders in async mode. This will ensure that the
- //node of shadow PR that went down last will come up first
- startSenderInVMsAsync("ln", vm4, vm5, vm6, vm7);
-
- LogWriterUtils.getLogWriter().info("Waiting for senders running.");
- //wait for senders running
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- LogWriterUtils.getLogWriter().info("All the senders are now running...");
-
- //----------------------------------------------------------------------------------------------------
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName()+"PR1", 3000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName()+"PR1", 3000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName()+"PR2", 5000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName()+"PR2", 5000 ));
- }
-
- public static void validateParallelSenderQueueAllBucketsDrained(final String senderId) {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- ConcurrentParallelGatewaySenderQueue regionQueue = (ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0];
-
- Set<PartitionedRegion> shadowPRs = (Set<PartitionedRegion>)regionQueue.getRegions();
-
- for(PartitionedRegion shadowPR: shadowPRs) {
- Set<BucketRegion> buckets = shadowPR.getDataStore().getAllLocalBucketRegions();
-
- for (final BucketRegion bucket : buckets) {
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (bucket.keySet().size() == 0) {
- LogWriterUtils.getLogWriter().info("Bucket " + bucket.getId() + " is empty");
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected bucket entries for bucket: " + bucket.getId() + " is: 0 but actual entries: "
- + bucket.keySet().size() + " This bucket isPrimary: " + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet();
- }
- };
- Wait.waitForCriterion(wc, 180000, 50, true);
-
- }//for loop ends
- }
-
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
deleted file mode 100644
index 2fb77ff..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.misc;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-@SuppressWarnings("serial")
-@Category(DistributedTest.class)
-public class CommonParallelGatewaySenderOffHeapDUnitTest extends
- CommonParallelGatewaySenderDUnitTest {
-
- public CommonParallelGatewaySenderOffHeapDUnitTest() {
- super();
- }
-
- @Override
- public boolean isOffHeap() {
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
deleted file mode 100644
index 8b8c624..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.misc;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
-import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
-import com.gemstone.gemfire.internal.cache.EntrySnapshot;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
-import com.gemstone.gemfire.internal.cache.RegionEntry;
-import com.gemstone.gemfire.internal.cache.Token.Tombstone;
-import com.gemstone.gemfire.internal.cache.versions.VersionTag;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.Wait;
-
-/**
- *
- * Test verifies that version tag for destroyed entry is propagated back to
- * origin distributed system if the version tag is applied and replaces old
- * version information in destination distributed system.
- *
- * Version tag information which is relevant between multiple distributed
- * systems consistency check is basically dsid and timestamp.
- */
-@Category(DistributedTest.class)
-public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase {
-
- public NewWANConcurrencyCheckForDestroyDUnitTest() {
- super();
- }
-
- @Test
- public void testVersionTagTimestampForDestroy() {
-
-
- // create three distributed systems with each having a cache containing
- // a Replicated Region with one entry and concurrency checks enabled.
-
- // Site 2 and Site 3 only know about Site 1 but Site 1 knows about both
- // Site 2 and Site 3.
-
- // Site 1
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- createCacheInVMs(lnPort, vm1);
- Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
-
- //Site 2
- Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createCacheInVMs(nyPort, vm3);
- Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
-
- //Site 3
- Integer tkPort = (Integer)vm4.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
- createCacheInVMs(tkPort, vm5);
- Integer tkRecPort = (Integer) vm5.invoke(() -> WANTestBase.createReceiver());
-
- LogWriterUtils.getLogWriter().info("Created locators and receivers in 3 distributed systems");
-
- //Site 1
- vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 1, false, false, null, true ));
- vm1.invoke(() -> WANTestBase.createSender( "ln2", 3,
- true, 10, 1, false, false, null, true ));
-
- vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1,ln2", 0, 1, isOffHeap() ));
- vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
- vm1.invoke(() -> WANTestBase.startSender( "ln2" ));
- vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
- vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln2" ));
-
- //Site 2
- vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
- true, 10, 1, false, false, null, true ));
-
- vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
- vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
-
- //Site 3 which only knows about Site 1.
- vm5.invoke(() -> WANTestBase.createSender( "tk1", 1,
- true, 10, 1, false, false, null, true ));
-
- vm5.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "tk1", 0, 1, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.startSender( "tk1" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "tk1" ));
-
- Wait.pause(2000);
-
- // Perform a put in vm1
- vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") {
-
- @Override
- public void run2() throws CacheException {
- assertNotNull(cache);
-
- Region region = cache.getRegion("/repRegion");
- region.put("testKey", "testValue");
-
- assertEquals(1, region.size());
- }
- });
-
- //wait for vm1 to propagate put to vm3 and vm5
- Wait.pause(2000);
-
- long destroyTimeStamp = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterOp());
-
- //wait for vm1 to propagate destroyed entry's new version tag to vm5
- Wait.pause(2000);
-
- vm5.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.verifyTimestampAfterOp(destroyTimeStamp, 1 /* ds 3 receives gateway event only from ds 1*/));
- }
-
- /**
- * Test creates two sites and one Replicated Region on each with Serial
- * GatewaySender on each. Test checks for sequence of events being sent from
- * site1 to site2 for PUTALL and PUT and finally checks for final timestamp in
- * version for RegionEntry with key "testKey". If timestamp on both site is
- * same that means events were transferred in correct sequence.
- */
- @Test
- public void testPutAllEventSequenceOnSerialGatewaySenderWithRR() {
-
- // create two distributed systems with each having a cache containing
- // a Replicated Region with one entry and concurrency checks enabled.
-
- // Site 1
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- vm1.invoke(() -> WANTestBase.createCache(lnPort));
- Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
-
- //Site 2
- Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- vm3.invoke(() -> WANTestBase.createCache(nyPort));
- Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
-
- LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
-
- //Site 1
- vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
- false, 10, 1, false, false, null, true ));
-
- vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() ));
- vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
- vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
-
- //Site 2
- vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
- false, 10, 1, false, false, null, true ));
-
- vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
- vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
- vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
-
- Wait.pause(2000);
-
- // Perform a put in vm1
- AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
-
- @Override
- public void run2() throws CacheException {
- assertNotNull(cache);
- // Test hook to make put wait after RE lock is released but before Gateway events are sent.
- DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000;
-
- Region region = cache.getRegion("/repRegion");
- Map testMap = new HashMap();
- testMap.put("testKey", "testValue1");
- region.putAll(testMap);
-
- assertEquals(1, region.size());
- assertEquals("testValue2", region.get("testKey"));
- }
- });
-
- //wait for vm1 to propagate put to vm3
- Wait.pause(1000);
-
- AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
-
- @Override
- public void run2() throws CacheException {
- assertNotNull(cache);
- Region region = cache.getRegion("/repRegion");
-
- while (!region.containsKey("testKey")) {
- Wait.pause(10);
- }
- // Test hook to make put wait after RE lock is released but before Gateway events are sent.
- DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
-
- region.put("testKey", "testValue2");
-
- assertEquals(1, region.size());
- assertEquals("testValue2", region.get("testKey"));
- }
- });
-
- try {
- asynch1.join(5000);
- asynch2.join(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") {
-
- @Override
- public void run2() throws CacheException {
- DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
- }
- });
- }
-
- //Wait for all Gateway events be received by vm3.
- Wait.pause(1000);
-
- long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
-
- long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
-
- assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
- }
-
-/**
- * This is similar to above test but for PartitionedRegion.
- */
- @Test
- public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() {
-
- // create two distributed systems with each having a cache containing
- // a Replicated Region with one entry and concurrency checks enabled.
-
- // Site 1
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- createCacheInVMs(lnPort, vm1);
- Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
-
- //Site 2
- Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createCacheInVMs(nyPort, vm3);
- Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
-
- LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
-
- //Site 1
- vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
- false, 10, 1, false, false, null, true ));
-
- vm1.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ln1", 0, 1, isOffHeap() ));
- vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
- vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
-
- //Site 2
- vm3.invoke(() -> WANTestBase.createSender( "ny1", 1,
- false, 10, 1, false, false, null, true ));
-
- vm3.invoke(() -> WANTestBase.createPartitionedRegion("repRegion", "ny1", 0, 1, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.startSender( "ny1" ));
- vm3.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
-
- Wait.pause(2000);
-
- // Perform a put in vm1
- AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
-
- @Override
- public void run2() throws CacheException {
- assertNotNull(cache);
- // Test hook to make put wait after RE lock is released but before Gateway events are sent.
- DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000;
-
- Region region = cache.getRegion("/repRegion");
- Map testMap = new HashMap();
- testMap.put("testKey", "testValue1");
- region.putAll(testMap);
-
- assertEquals(1, region.size());
- assertEquals("testValue2", region.get("testKey"));
- }
- });
-
- //wait for vm1 to propagate put to vm3
- Wait.pause(1000);
-
- AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") {
-
- @Override
- public void run2() throws CacheException {
- assertNotNull(cache);
- Region region = cache.getRegion("/repRegion");
-
- while (!region.containsKey("testKey")) {
- Wait.pause(10);
- }
- // Test hook to make put wait after RE lock is released but before Gateway events are sent.
- DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
-
- region.put("testKey", "testValue2");
-
- assertEquals(1, region.size());
- assertEquals("testValue2", region.get("testKey"));
- }
- });
-
- try {
- asynch1.join(5000);
- asynch2.join(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") {
-
- @Override
- public void run2() throws CacheException {
- DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
- }
- });
- }
-
- //Wait for all Gateway events be received by vm3.
- Wait.pause(1000);
-
- long putAllTimeStampVm1 = (Long) vm1.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
-
- long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
-
- assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
- }
-
- /**
- * Tests if conflict checks are happening based on DSID and timestamp even if
- * version tag is generated in local distributed system.
- */
- @Test
- public void testConflictChecksBasedOnDsidAndTimeStamp() {
-
-
- // create two distributed systems with each having a cache containing
- // a Replicated Region with one entry and concurrency checks enabled.
-
- // Site 1
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- createCacheInVMs(lnPort, vm1);
- Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
-
- //Site 2
- Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- createCacheInVMs(nyPort, vm3);
- Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
-
- LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
-
- //Site 1
- vm1.invoke(() -> WANTestBase.createSender( "ln1", 2,
- false, 10, 1, false, false, null, true ));
-
- vm1.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ln1", isOffHeap() ));
- vm1.invoke(() -> WANTestBase.startSender( "ln1" ));
- vm1.invoke(() -> WANTestBase.waitForSenderRunningState( "ln1" ));
-
- //Site 2
- vm3.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createCache( nyPort ));
- vm4.invoke(() -> WANTestBase.createSender( "ny1", 1,
- false, 10, 1, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion("repRegion", "ny1", isOffHeap() ));
- vm4.invoke(() -> WANTestBase.startSender( "ny1" ));
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ny1" ));
-
- Wait.pause(2000);
-
- // Perform a put in vm1
- vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") {
-
- @Override
- public void run2() throws CacheException {
- assertNotNull(cache);
-
- Region region = cache.getRegion("/repRegion");
- region.put("testKey", "testValue1");
-
- assertEquals(1, region.size());
- }
- });
-
- //wait for vm4 to have later timestamp before sending operation to vm1
- Wait.pause(300);
-
- AsyncInvocation asynch = vm4.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds2 in vm4") {
-
- @Override
- public void run2() throws CacheException {
- assertNotNull(cache);
- Region region = cache.getRegion("/repRegion");
-
- region.put("testKey", "testValue2");
-
- assertEquals(1, region.size());
- assertEquals("testValue2", region.get("testKey"));
- }
- });
-
- try {
- asynch.join(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- //Wait for all local ds events be received by vm3.
- Wait.pause(1000);
-
- vm3.invoke(new CacheSerializableRunnable("Check dsid") {
-
- @Override
- public void run2() throws CacheException {
- Region region = cache.getRegion("repRegion");
-
- Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/
- true); //commented while merging revision 43582
- RegionEntry re = null;
- if (entry instanceof EntrySnapshot) {
- re = ((EntrySnapshot)entry).getRegionEntry();
- } else if (entry instanceof NonTXEntry) {
- re = ((NonTXEntry)entry).getRegionEntry();
- }
- VersionTag tag = re.getVersionStamp().asVersionTag();
- assertEquals(2, tag.getDistributedSystemId());
- }
- });
-
- // Check vm3 has latest timestamp from vm4.
- long putAllTimeStampVm1 = (Long) vm4.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
-
- long putAllTimeStampVm3 = (Long) vm3.invoke(() -> NewWANConcurrencyCheckForDestroyDUnitTest.getVersionTimestampAfterPutAllOp());
-
- assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
- }
-
- /*
- * For VM1 in ds 1. Used in testPutAllEventSequenceOnSerialGatewaySender.
- */
- public static long getVersionTimestampAfterPutAllOp() {
- Region region = cache.getRegion("repRegion");
-
- while (!(region.containsKey("testKey") /*&& region.get("testKey").equals("testValue2") */)) {
- Wait.pause(10);
- }
- assertEquals(1, region.size());
-
- Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
- RegionEntry re = null;
- if (entry instanceof EntrySnapshot) {
- re = ((EntrySnapshot)entry).getRegionEntry();
- } else if (entry instanceof NonTXEntry) {
- re = ((NonTXEntry)entry).getRegionEntry();
- }
- if (re != null) {
- LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region));
-
- VersionTag tag = re.getVersionStamp().asVersionTag();
- return tag.getVersionTimeStamp();
- } else {
- return -1;
- }
- }
-
- /*
- * For VM3 in ds 2.
- */
- public static long getVersionTimestampAfterOp() {
- Region region = cache.getRegion("repRegion");
- assertEquals(1, region.size());
-
- region.destroy("testKey");
-
- Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
- RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
- LogWriterUtils.getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region));
- assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
-
- VersionTag tag = re.getVersionStamp().asVersionTag();
- return tag.getVersionTimeStamp();
- }
-
- /*
- * For VM 5 in ds 3.
- */
- public static void verifyTimestampAfterOp(long timestamp, int memberid) {
- Region region = cache.getRegion("repRegion");
- assertEquals(0, region.size());
-
- Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true);
- RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
- assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
-
- VersionTag tag = re.getVersionStamp().asVersionTag();
- assertEquals(timestamp, tag.getVersionTimeStamp());
- assertEquals(memberid, tag.getDistributedSystemId());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
deleted file mode 100644
index 76940ea..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java
+++ /dev/null
@@ -1,469 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.misc;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
-import static com.gemstone.gemfire.test.dunit.Assert.*;
-
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import com.jayway.awaitility.Awaitility;
-import org.apache.geode.security.templates.SampleSecurityManager;
-import org.apache.logging.log4j.Logger;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.distributed.DistributedSystem;
-import com.gemstone.gemfire.internal.Assert;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.logging.LogService;
-import com.gemstone.gemfire.security.AuthInitialize;
-import com.gemstone.gemfire.security.AuthenticationFailedException;
-import com.gemstone.gemfire.security.SecurityTestUtils;
-import com.gemstone.gemfire.security.generator.CredentialGenerator;
-import com.gemstone.gemfire.security.generator.DummyCredentialGenerator;
-import com.gemstone.gemfire.security.templates.UserPasswordAuthInit;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-@Category(DistributedTest.class)
-public class NewWanAuthenticationDUnitTest extends WANTestBase {
-
- public static final Logger logger = LogService.getLogger();
-
- public static boolean isDifferentServerInGetCredentialCall = false;
-
- /**
- * Authentication test for new WAN with valid credentials. Although, nothing
- * related to authentication has been changed in new WAN, this test case is
- * added on request from QA for defect 44650.
- */
- @Test
- public void testWanAuthValidCredentials() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- logger.info("Created locator on local site");
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- logger.info("Created locator on remote site");
-
-
- CredentialGenerator gen = new DummyCredentialGenerator();
- Properties extraProps = gen.getSystemProperties();
-
- String clientauthenticator = gen.getAuthenticator();
- String clientauthInit = gen.getAuthInit();
-
- Properties credentials1 = gen.getValidCredentials(1);
- if (extraProps != null) {
- credentials1.putAll(extraProps);
- }
- Properties javaProps1 = gen.getJavaProperties();
-
- // vm3's invalid credentials
- Properties credentials2 = gen.getInvalidCredentials(1);
- if (extraProps != null) {
- credentials2.putAll(extraProps);
- }
- Properties javaProps2 = gen.getJavaProperties();
-
- Properties props1 = buildProperties(clientauthenticator, clientauthInit,
- null, credentials1, null);
-
- // have vm 3 start a cache with invalid credentails
- Properties props2 = buildProperties(clientauthenticator, clientauthInit,
- null, credentials2, null);
-
- vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props1, javaProps1, lnPort ));
- logger.info("Created secured cache in vm2");
-
- vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props2, javaProps2, nyPort ));
- logger.info("Created secured cache in vm3");
-
- vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- logger.info("Created sender in vm2");
-
- vm3.invoke(() -> createReceiverInSecuredCache());
- logger.info("Created receiver in vm3");
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- logger.info("Created RR in vm2");
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- logger.info("Created RR in vm3");
-
- // this tests verifies that even though vm3 has invalid credentials, vm2 can still send data to vm3 because
- // vm2 has valid credentials
- vm2.invoke(() -> WANTestBase.startSender( "ln" ));
- vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1));
- vm3.invoke(() -> {
- Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR");
- Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0));
- });
- logger.info("Done successfully.");
- }
-
- @Test
- public void testWanIntegratedSecurityWithValidCredentials() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- logger.info("Created locator on local site");
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- logger.info("Created locator on remote site");
-
-
- Properties props1 = buildSecurityProperties("admin", "secret");
- Properties props2 = buildSecurityProperties("guest", "guest");
-
- vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props1, null, lnPort ));
- logger.info("Created secured cache in vm2");
-
- vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props2, null, nyPort ));
- logger.info("Created secured cache in vm3");
-
- vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- logger.info("Created sender in vm2");
-
- vm3.invoke(() -> createReceiverInSecuredCache());
- logger.info("Created receiver in vm3");
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- logger.info("Created RR in vm2");
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- logger.info("Created RR in vm3");
-
- vm2.invoke(() -> WANTestBase.startSender( "ln" ));
- vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1));
- vm3.invoke(() -> {
- Region r = cache.getRegion(Region.SEPARATOR + getTestMethodName() + "_RR");
- Awaitility.waitAtMost(20, TimeUnit.SECONDS).until(() -> assertTrue(r.size() > 0));
-
- });
- logger.info("Done successfully.");
- }
-
- /**
- * Test authentication with new WAN with invalid credentials. Although,
- * nothing related to authentication has been changed in new WAN, this test
- * case is added on request from QA for defect 44650.
- */
- @Test
- public void testWanAuthInvalidCredentials() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- logger.info("Created locator on local site");
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- logger.info("Created locator on remote site");
-
-
- CredentialGenerator gen = new DummyCredentialGenerator();
- logger.info("Picked up credential: " + gen);
-
- Properties extraProps = gen.getSystemProperties();
-
- String clientauthenticator = gen.getAuthenticator();
- String clientauthInit = gen.getAuthInit();
-
- Properties credentials1 = gen.getInvalidCredentials(1);
- if (extraProps != null) {
- credentials1.putAll(extraProps);
- }
- Properties javaProps1 = gen.getJavaProperties();
- Properties credentials2 = gen.getInvalidCredentials(2);
- if (extraProps != null) {
- credentials2.putAll(extraProps);
- }
- Properties javaProps2 = gen.getJavaProperties();
-
- Properties props1 = buildProperties(clientauthenticator, clientauthInit,
- null, credentials1, null);
- Properties props2 = buildProperties(clientauthenticator, clientauthInit,
- null, credentials2, null);
-
- logger.info("Done building auth properties");
-
- vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props1, javaProps1, lnPort ));
- logger.info("Created secured cache in vm2");
-
- vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props2, javaProps2, nyPort ));
- logger.info("Created secured cache in vm3");
-
- vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- logger.info("Created sender in vm2");
-
- vm3.invoke(() -> createReceiverInSecuredCache());
- logger.info("Created receiver in vm3");
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- logger.info("Created RR in vm2");
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- logger.info("Created RR in vm3");
-
- try {
- vm2.invoke(() -> WANTestBase.startSender( "ln" ));
- fail("Authentication Failed: While starting the sender, an exception should have been thrown");
- } catch (Exception e) {
- if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) {
- fail("Authentication is not working as expected", e);
- }
- }
- }
-
- /**
- * Test authentication with new WAN with invalid credentials. Although,
- * nothing related to authentication has been changed in new WAN, this test
- * case is added on request from QA for defect 44650.
- */
- @Test
- public void testWanSecurityManagerWithInvalidCredentials() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- logger.info("Created locator on local site");
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- logger.info("Created locator on remote site");
-
- Properties props1 = buildSecurityProperties("admin", "wrongPswd");
- Properties props2 = buildSecurityProperties("guest", "wrongPswd");
-
- logger.info("Done building auth properties");
-
- vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props1, null, lnPort ));
- logger.info("Created secured cache in vm2");
-
- vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props2, null, nyPort ));
- logger.info("Created secured cache in vm3");
-
- vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- logger.info("Created sender in vm2");
-
- vm3.invoke(() -> createReceiverInSecuredCache());
- logger.info("Created receiver in vm3");
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- logger.info("Created RR in vm2");
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- logger.info("Created RR in vm3");
-
- try {
- vm2.invoke(() -> WANTestBase.startSender( "ln" ));
- fail("Authentication Failed: While starting the sender, an exception should have been thrown");
- } catch (Exception e) {
- if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) {
- fail("Authentication is not working as expected", e);
- }
- }
- }
-
- private static Properties buildProperties(String clientauthenticator,
- String clientAuthInit, String accessor, Properties extraAuthProps,
- Properties extraAuthzProps) {
- Properties authProps = new Properties();
- if (clientauthenticator != null) {
- authProps.setProperty(
- SECURITY_CLIENT_AUTHENTICATOR,
- clientauthenticator);
- }
- if (accessor != null) {
- authProps.setProperty(SECURITY_CLIENT_ACCESSOR,
- accessor);
- }
- if (clientAuthInit != null) {
- authProps.setProperty(SECURITY_CLIENT_AUTH_INIT, clientAuthInit);
- }
- if (extraAuthProps != null) {
- authProps.putAll(extraAuthProps);
- }
- if (extraAuthzProps != null) {
- authProps.putAll(extraAuthzProps);
- }
- return authProps;
- }
-
- private static Properties buildSecurityProperties(String username, String password){
- Properties props = new Properties();
- props.put(SECURITY_MANAGER, SampleSecurityManager.class.getName());
- props.put("security-json", "org/apache/geode/security/templates/security.json");
- props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName());
- props.put("security-username", username);
- props.put("security-password", password);
- return props;
- }
-
- public static void createSecuredCache(Properties authProps, Object javaProps, Integer locPort) {
- authProps.setProperty(MCAST_PORT, "0");
- authProps.setProperty(LOCATORS, "localhost[" + locPort + "]");
-
- logger.info("Set the server properties to: " + authProps);
- logger.info("Set the java properties to: " + javaProps);
-
- SecurityTestUtils tmpInstance = new SecurityTestUtils("temp");
- DistributedSystem ds = tmpInstance.createSystem(authProps, (Properties)javaProps);
- assertNotNull(ds);
- assertTrue(ds.isConnected());
- cache = CacheFactory.create(ds);
- assertNotNull(cache);
- }
-
- public static class UserPasswdAI extends UserPasswordAuthInit {
-
- public static AuthInitialize createAI() {
- return new UserPasswdAI();
- }
-
- @Override
- public Properties getCredentials(Properties props,
- DistributedMember server, boolean isPeer)
- throws AuthenticationFailedException {
- boolean val = ( CacheFactory.getAnyInstance().getDistributedSystem().getDistributedMember().getProcessId() != server.getProcessId());
- Assert.assertTrue(val, "getCredentials: Server should be different");
- Properties p = super.getCredentials(props, server, isPeer);
- if(val) {
- isDifferentServerInGetCredentialCall = true;
- CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall);
- } else {
- CacheFactory.getAnyInstance().getLoggerI18n().convertToLogWriter().config("setting22 isDifferentServerInGetCredentialCall " + isDifferentServerInGetCredentialCall);
- }
- return p;
- }
- }
-
- public static void verifyDifferentServerInGetCredentialCall(){
- Assert.assertTrue(isDifferentServerInGetCredentialCall, "verifyDifferentServerInGetCredentialCall: Server should be different");
- isDifferentServerInGetCredentialCall = false;
- }
-
- @Test
- public void testWanAuthValidCredentialsWithServer() {
- disconnectAllFromDS();
- {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- logger.info("Created locator on local site");
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- logger.info("Created locator on remote site");
-
- DummyCredentialGenerator gen = new DummyCredentialGenerator();
- gen.init();
- Properties extraProps = gen.getSystemProperties();
-
- String clientauthenticator = gen.getAuthenticator();
- String clientauthInit = UserPasswdAI.class.getName() + ".createAI";
-
- Properties credentials1 = gen.getValidCredentials(1);
- if (extraProps != null) {
- credentials1.putAll(extraProps);
- }
- Properties javaProps1 = gen.getJavaProperties();
-
- Properties credentials2 = gen.getInvalidCredentials(2);
- if (extraProps != null) {
- credentials2.putAll(extraProps);
- }
- Properties javaProps2 = gen.getJavaProperties();
-
- Properties props1 = buildProperties(clientauthenticator, clientauthInit,
- null, credentials1, null);
- Properties props2 = buildProperties(clientauthenticator, clientauthInit,
- null, credentials2, null);
-
- vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props1, javaProps1, lnPort ));
- logger.info("Created secured cache in vm2");
-
- vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props2, javaProps2, nyPort ));
- logger.info("Created secured cache in vm3");
-
- vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- logger.info("Created sender in vm2");
-
- vm3.invoke(() -> createReceiverInSecuredCache());
- logger.info("Created receiver in vm3");
-
- vm2.invoke(() -> WANTestBase.startSender( "ln" ));
- vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- vm2.invoke(() -> verifyDifferentServerInGetCredentialCall());
- vm3.invoke(() -> verifyDifferentServerInGetCredentialCall());
- }
- }
-
- @Test
- public void testWanSecurityManagerAuthValidCredentialsWithServer() {
- disconnectAllFromDS();
- {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- logger.info("Created locator on local site");
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- logger.info("Created locator on remote site");
-
- Properties props1 = buildSecurityProperties("admin", "secret");
- Properties props2 = buildSecurityProperties("guest", "guest");
-
- vm2.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props1, null, lnPort ));
- logger.info("Created secured cache in vm2");
-
- vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(
- props2, null, nyPort ));
- logger.info("Created secured cache in vm3");
-
- vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- logger.info("Created sender in vm2");
-
- vm3.invoke(() -> createReceiverInSecuredCache());
- logger.info("Created receiver in vm3");
-
- vm2.invoke(() -> WANTestBase.startSender( "ln" ));
- vm2.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- vm2.invoke(() -> verifyDifferentServerInGetCredentialCall());
-
- // this would fail for now because for integrated security, we are not sending the receiver's credentials back
- // to the sender. Because in the old security implementation, even though the receiver's credentials are sent back to the sender
- // the sender is not checking it.
- //vm3.invoke(() -> verifyDifferentServerInGetCredentialCall());
- }
- }
-}