You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:13 UTC
[25/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
deleted file mode 100644
index 47dee96..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagation_PartitionedRegionDUnitTest.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.CancelException;
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.junit.categories.FlakyTest;
-
-@Category(DistributedTest.class)
-public class SerialWANPropagation_PartitionedRegionDUnitTest extends WANTestBase {
-
- private static final long serialVersionUID = 1L;
-
- public SerialWANPropagation_PartitionedRegionDUnitTest() {
- super();
- }
-
- @Test
- public void testPartitionedSerialPropagation() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
- //vm5.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testBothReplicatedAndPartitionedSerialPropagation()
- throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testSerialReplicatedAndPartitionedPropagation() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "lnSerial",
- 2, false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnSerial",
- 2, false, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial", 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.startSender( "lnSerial" ));
- vm5.invoke(() -> WANTestBase.startSender( "lnSerial" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testSerialReplicatedAndSerialPartitionedPropagation()
- throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "lnSerial1",
- 2, false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnSerial1",
- 2, false, 100, 10, false, false, null, true ));
-
- vm5.invoke(() -> WANTestBase.createSender( "lnSerial2",
- 2, false, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "lnSerial2",
- 2, false, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial1", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial1", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial1", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "lnSerial1", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial2", 1, 100, isOffHeap() ));
-
- startSenderInVMs("lnSerial1", vm4, vm5);
-
- startSenderInVMs("lnSerial2", vm5, vm6);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testPartitionedSerialPropagationToTwoWanSites() throws Exception {
-
- Integer lnPort = createFirstLocatorWithDSId(1);
- Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort ));
- Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3,lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver());
- createCacheInVMs(tkPort, vm3);
- vm3.invoke(() -> WANTestBase.createReceiver());
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "lnSerial1",
- 2, false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnSerial1",
- 2, false, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createSender( "lnSerial2",
- 3, false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnSerial2",
- 3, false, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- startSenderInVMs("lnSerial1", vm4, vm5);
- startSenderInVMs("lnSerial2", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "lnSerial1,lnSerial2", 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testPartitionedSerialPropagationHA() throws Exception {
- IgnoredException.addIgnoredException("Broken pipe");
- IgnoredException.addIgnoredException("Connection reset");
- IgnoredException.addIgnoredException("Unexpected IOException");
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- //do initial 100 puts to create all the buckets
- vm5.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
-
- IgnoredException.addIgnoredException(CancelException.class.getName());
- IgnoredException.addIgnoredException(CacheClosedException.class.getName());
- IgnoredException.addIgnoredException(ForceReattemptException.class.getName());
- //start async puts
- AsyncInvocation inv = vm5.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 1000 ));
- //close the cache on vm4 in between the puts
- vm4.invoke(() -> WANTestBase.killSender());
-
- inv.join();
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-
- @Test
- public void testPartitionedSerialPropagationWithParallelThreads() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- createReceiverInVMs(vm2, vm3);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
-
- startSenderInVMs("ln", vm4, vm5);
-
-
-
- vm4.invoke(() -> WANTestBase.doMultiThreadedPuts( // TODO: eats exceptions
- getTestMethodName() + "_PR", 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java
deleted file mode 100644
index f3b6765..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
-import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-
-
-@Category(DistributedTest.class)
-public class SerialWANPropagationsFeatureDUnitTest extends WANTestBase{
-
- private static final long serialVersionUID = 1L;
-
- public SerialWANPropagationsFeatureDUnitTest() {
- super();
- }
-
- @Test
- public void testSerialReplicatedWanWithOverflow() {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 10, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 10, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
- vm2.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR"));
- vm3.invoke(() -> addListenerToSleepAfterCreateEvent(1000, getTestMethodName() + "_RR"));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doHeavyPuts(
- getTestMethodName() + "_RR", 15 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 15, 240000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 15, 240000 ));
- }
-
- @Test
- public void testSerialReplicatedWanWithPersistence() {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, true, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, true, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
-
- }
-
- @Test
- public void testReplicatedSerialPropagationWithConflation() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 1000, true, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 1000, true, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- }
-
- @Test
- public void testReplicatedSerialPropagationWithParallelThreads()
- throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doMultiThreadedPuts(
- getTestMethodName() + "_RR", 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- }
-
- @Test
- public void testSerialPropagationWithFilter() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName(), null, 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 800 ));
- }
-
- @Test
- public void testReplicatedSerialPropagationWithFilter() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 800 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 800 ));
- }
-
- @Test
- public void testReplicatedSerialPropagationWithFilter_AfterAck()
- throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm6, vm7);
- createReceiverInVMs(vm6, vm7);
-
- createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false,
- new MyGatewayEventFilter_AfterAck(), true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false,
- new MyGatewayEventFilter_AfterAck(), true ));
-
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), null, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), "ln", isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), "ln", isOffHeap() ));
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName(), "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName(), 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
-
- Integer vm4Acks = (Integer)vm4.invoke(() -> WANTestBase.validateAfterAck( "ln"));
- Integer vm5Acks = (Integer)vm5.invoke(() -> WANTestBase.validateAfterAck( "ln"));
-
- assertEquals(2000, (vm4Acks + vm5Acks));
-
- vm6.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
- vm7.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName(), 1000 ));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
deleted file mode 100644
index 2019cac..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.serial;
-
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import static com.gemstone.gemfire.test.dunit.Wait.*;
-import static com.gemstone.gemfire.test.dunit.IgnoredException.*;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.jayway.awaitility.Awaitility;
-
-@Category(DistributedTest.class)
-public class SerialWANStatsDUnitTest extends WANTestBase {
-
- private static final long serialVersionUID = 1L;
-
- private String testName;
-
- public SerialWANStatsDUnitTest() {
- super();
- }
-
- @Override
- protected final void postSetUpWANTestBase() throws Exception {
- this.testName = getTestMethodName();
- addIgnoredException("java.net.ConnectException");
- addIgnoredException("java.net.SocketException");
- addIgnoredException("Unexpected IOException");
- }
-
- @Test
- public void testReplicatedSerialPropagation() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
-
- vm5.invoke(() -> WANTestBase.doPuts( testName + "_RR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR", 1000 ));
-
- pause(2000);
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 ));
-
- vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 1000, 1000, 1000));
- vm4.invoke(() -> WANTestBase.checkBatchStats("ln",
- 100));
-
- vm5.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 1000, 0, 0));
- vm5.invoke(() -> WANTestBase.checkBatchStats("ln",
- 0));
-
- }
-
- @Test
- public void testReplicatedSerialPropagationWithMultipleDispatchers() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
- false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY ));
- vm5.invoke(() -> WANTestBase.createSenderWithMultipleDispatchers( "ln", 2,
- false, 100, 10, false, false, null, true, 2, OrderPolicy.KEY ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
-
- vm5.invoke(() -> WANTestBase.doPuts( testName + "_RR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR", 1000 ));
-
- pause(2000);
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 ));
-
- vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 1000, 1000, 1000));
- vm4.invoke(() -> WANTestBase.checkBatchStats("ln",
- 100));
-
- vm5.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 1000, 0, 0));
- vm5.invoke(() -> WANTestBase.checkBatchStats("ln",
- 0));
-
- }
-
- @Test
- public void testWANStatsTwoWanSites() throws Exception {
-
- Integer lnPort = createFirstLocatorWithDSId(1);
- Integer nyPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
- Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver());
- createCacheInVMs(tkPort, vm3);
- vm3.invoke(() -> WANTestBase.createReceiver());
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "lnSerial1",
- 2, false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnSerial1",
- 2, false, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createSender( "lnSerial2",
- 3, false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "lnSerial2",
- 3, false, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("lnSerial1", vm4, vm5);
- startSenderInVMs("lnSerial2", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR",
- 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR", 1000 ));
-
- pause(2000);
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 ));
- vm3.invoke(() -> WANTestBase.checkGatewayReceiverStats(100, 1000, 1000 ));
-
- vm4.invoke(() -> WANTestBase.checkQueueStats("lnSerial1",
- 0, 1000, 1000, 1000));
- vm4.invoke(() -> WANTestBase.checkBatchStats("lnSerial1",
- 100));
- vm4.invoke(() -> WANTestBase.checkQueueStats("lnSerial2",
- 0, 1000, 1000, 1000));
- vm4.invoke(() -> WANTestBase.checkBatchStats("lnSerial2",
- 100));
- vm5.invoke(() -> WANTestBase.checkQueueStats("lnSerial1",
- 0, 1000, 0, 0));
- vm5.invoke(() -> WANTestBase.checkBatchStats("lnSerial1",
- 0));
- vm5.invoke(() -> WANTestBase.checkQueueStats("lnSerial2",
- 0, 1000, 0, 0));
- vm5.invoke(() -> WANTestBase.checkBatchStats("lnSerial2",
- 0));
-
- }
-
- @Test
- public void testReplicatedSerialPropagationHA() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm2.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- vm4.invoke(() -> WANTestBase.createCache(lnPort ));
- vm5.invoke(() -> WANTestBase.createCache(lnPort ));
- vm6.invoke(() -> WANTestBase.createCache(lnPort ));
- vm7.invoke(() -> WANTestBase.createCache(lnPort ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false, null, true ));
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", null, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR", "ln", isOffHeap() ));
-
- AsyncInvocation inv1 = vm5.invokeAsync(() -> WANTestBase.doPuts( testName + "_RR", 10000 ));
- pause(2000);
- AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender( "ln" ));
- Boolean isKilled = Boolean.FALSE;
- try {
- isKilled = (Boolean)inv2.getResult();
- }
- catch (Throwable e) {
- fail("Unexpected exception while killing a sender");
- }
- AsyncInvocation inv3 = null;
- if(!isKilled){
- inv3 = vm5.invokeAsync(() -> WANTestBase.killSender( "ln" ));
- inv3.join();
- }
- inv1.join();
- inv2.join();
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR", 10000 ));
-
- vm2.invoke(() -> WANTestBase.checkGatewayReceiverStatsHA(1000, 10000, 10000 ));
-
- vm5.invoke(() -> WANTestBase.checkStats_Failover("ln", 10000));
- }
-
- @Test
- public void testReplicatedSerialPropagationUnprocessedEvents() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- //these are part of remote site
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- //these are part of local site
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- //senders are created on local site
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 20, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 20, false, false, null, true ));
-
- //create one RR (RR_1) on remote site
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", null, isOffHeap() ));
-
- //create another RR (RR_2) on remote site
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_2", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_2", null, isOffHeap() ));
-
- //start the senders on local site
- startSenderInVMs("ln", vm4, vm5);
-
- //create one RR (RR_1) on local site
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", "ln", isOffHeap() ));
-
- //create another RR (RR_2) on local site
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_2", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_2", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_2", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_2", "ln", isOffHeap() ));
-
- //start puts in RR_1 in another thread
- vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR_1", 1000 ));
- //do puts in RR_2 in main thread
- vm4.invoke(() -> WANTestBase.doPuts( testName + "_RR_2", 500 ));
- //sleep for some time to let all the events propagate to remote site
- Thread.sleep(20);
- //vm4.invoke(() -> WANTestBase.verifyQueueSize( "ln", 0 ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR_1", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR_2", 500 ));
-
- pause(2000);
- vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 1500, 1500, 1500));
- vm4.invoke(() -> WANTestBase.checkBatchStats("ln",
- 75));
- vm4.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 0));
-
-
- vm5.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 1500, 0, 0));
- vm5.invoke(() -> WANTestBase.checkBatchStats("ln",
- 0));
- vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 1500));
- }
-
- /**
- *
- * Not Disabled - see ticket #52118
- *
- * NOTE: The test failure is avoided by having a larger number of puts operation so
- * that WANTestBase.verifyRegionQueueNotEmpty("ln" )) is successful as there is a
- * significant delay during the high number of puts.
- *
- * In future if this failure reappears, the put operations must be increase or a better fix must be found.
- *
- * 1 region and sender configured on local site and 1 region and a
- * receiver configured on remote site. Puts to the local region are in progress.
- * Remote region is destroyed in the middle.
- *
- * @throws Exception
- */
- @Test
- public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception {
- int numEntries = 20000;
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- //these are part of remote site
- vm2.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- //these are part of local site
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
- vm6.invoke(() -> WANTestBase.createCache( lnPort ));
- vm7.invoke(() -> WANTestBase.createCache( lnPort ));
-
- //senders are created on local site
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 100, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 100, false, false, null, true ));
-
- //create one RR (RR_1) on remote site
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", null, isOffHeap() ));
- //This is to cause a scenario where we have received at least X events and want to slow the receiver
- vm2.invoke(() -> WANTestBase.longPauseAfterNumEvents(500, 200));
- //start the senders on local site
- startSenderInVMs("ln", vm4, vm5);
-
- //create one RR (RR_1) on local site
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- testName + "_RR_1", "ln", isOffHeap() ));
-
- //start puts in RR_1 in another thread
- AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( testName + "_RR_1", numEntries ));
- //destroy RR_1 in remote site
- vm2.invoke(() -> WANTestBase.destroyRegion( testName + "_RR_1", 500));
-
- try {
- inv1.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail();
- }
-
- //assuming some events might have been dispatched before the remote region was destroyed,
- //sender's region queue will have events less than 1000 but the queue will not be empty.
- //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of
- //more in depth validations.
- vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" ));
-
- //verify that all is well in local site. All the events should be present in local region
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR_1", numEntries ));
-
- //like a latch to guarantee at least one exception returned
- vm4.invoke(() -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> WANTestBase.verifyQueueSize("ln", 0)));
-
- vm4.invoke(() -> WANTestBase.checkBatchStats("ln", true, true));
-
- vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", numEntries));
-
- vm2.invoke(() -> WANTestBase.checkExceptionStats(1));
-
- }
-
- @Test
- public void testSerialPropagationWithFilter() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, false, false,
- new MyGatewayEventFilter(), true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", 1, 100, isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, null, 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( testName, 1000 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, 800 ));
-
- pause(2000);
- vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 1000, 900, 800));
- vm4.invoke(() -> WANTestBase.checkEventFilteredStats("ln",
- 200));
- vm4.invoke(() -> WANTestBase.checkBatchStats("ln",
- 80));
- vm4.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 0));
-
-
- vm5.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 1000, 0, 0));
- vm5.invoke(() -> WANTestBase.checkBatchStats("ln",
- 0));
- vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln",900));
- }
-
- @Test
- public void testSerialPropagationConflation() throws Exception {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 10, true, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", 0, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", 0, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", 0, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, "ln", 0, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.pauseSender( "ln" ));
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, null,1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- testName, null,1, 100, isOffHeap() ));
-
- final Map keyValues = new HashMap();
- final Map updateKeyValues = new HashMap();
- for(int i=0; i< 1000; i++) {
- keyValues.put(i, i);
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, keyValues ));
-
- vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() ));
- for(int i=0;i<500;i++) {
- updateKeyValues.put(i, i+"_updated");
- }
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues ));
-
- vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, 0 ));
-
- vm4.invoke(() -> WANTestBase.putGivenKeyValue( testName, updateKeyValues ));
-
- vm4.invoke(() -> WANTestBase.checkQueueSize( "ln", keyValues.size() + updateKeyValues.size() ));
-
- vm4.invoke(() -> WANTestBase.resumeSender( "ln" ));
-
- keyValues.putAll(updateKeyValues);
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- testName, keyValues.size() ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- testName, keyValues.size() ));
-
- vm2.invoke(() -> WANTestBase.validateRegionContents(
- testName, keyValues ));
- vm3.invoke(() -> WANTestBase.validateRegionContents(
- testName, keyValues ));
-
- pause(2000);
- vm4.invoke(() -> WANTestBase.checkQueueStats("ln",
- 0, 2000, 2000, 1500));
- vm4.invoke(() -> WANTestBase.checkConflatedStats("ln",
- 500));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java
deleted file mode 100644
index f9b46ef..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/wancommand/WANCommandTestBase.java
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.wancommand;
-
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DiskStore;
-import com.gemstone.gemfire.cache.DiskStoreFactory;
-import com.gemstone.gemfire.cache.wan.*;
-import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.distributed.DistributedMember;
-import com.gemstone.gemfire.distributed.Locator;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.AvailablePort;
-import com.gemstone.gemfire.internal.AvailablePortHelper;
-import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
-import com.gemstone.gemfire.management.ManagementService;
-import com.gemstone.gemfire.management.internal.cli.commands.CliCommandTestBase;
-import com.gemstone.gemfire.test.dunit.*;
-
-import javax.management.remote.JMXConnectorServer;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
-import static com.gemstone.gemfire.test.dunit.Assert.assertEquals;
-import static com.gemstone.gemfire.test.dunit.Assert.fail;
-
-public abstract class WANCommandTestBase extends CliCommandTestBase {
-
- static Cache cache;
- private JMXConnectorServer jmxConnectorServer;
- private ManagementService managementService;
-// public String jmxHost;
-// public int jmxPort;
-
- static VM vm0;
- static VM vm1;
- static VM vm2;
- static VM vm3;
- static VM vm4;
- static VM vm5;
- static VM vm6;
- static VM vm7;
-
- @Override
- public final void postSetUpCliCommandTestBase() throws Exception {
- final Host host = Host.getHost(0);
- vm0 = host.getVM(0);
- vm1 = host.getVM(1);
- vm2 = host.getVM(2);
- vm3 = host.getVM(3);
- vm4 = host.getVM(4);
- vm5 = host.getVM(5);
- vm6 = host.getVM(6);
- vm7 = host.getVM(7);
- enableManagement();
- }
-
- public Integer createFirstLocatorWithDSId(int dsId) {
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
- props.setProperty(LOCATORS, "localhost[" + port + "]");
- props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- InternalDistributedSystem ds = getSystem(props);
- cache = CacheFactory.create(ds);
- return port;
- }
-
- public Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- Properties props = getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
- props.setProperty(LOCATORS, "localhost[" + port + "]");
- props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
- props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
- getSystem(props);
- return port;
- }
-
- public void createCache(Integer locPort){
- Properties props = getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort + "]");
- InternalDistributedSystem ds = getSystem(props);
- cache = CacheFactory.create(ds);
- }
-
- public void createCacheWithGroups(Integer locPort, String groups){
- Properties props = getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort + "]");
- props.setProperty(GROUPS, groups);
- InternalDistributedSystem ds = getSystem(props);
- cache = CacheFactory.create(ds);
- }
-
- public void createSender(String dsName, int remoteDsId,
- boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- GatewayEventFilter filter, boolean isManualStart) {
- File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
- persistentDirectory.mkdir();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- File [] dirs1 = new File[] {persistentDirectory};
- if(isParallel) {
- GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
- gateway.setParallel(true);
- gateway.setMaximumQueueMemory(maxMemory);
- gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManualStart);
- if (filter != null) {
- gateway.addGatewayEventFilter(filter);
- }
- if(isPersistent) {
- gateway.setPersistenceEnabled(true);
- gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
- }
- else {
- DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
- gateway.setDiskStoreName(store.getName());
- }
- gateway.setBatchConflationEnabled(isConflation);
- gateway.create(dsName, remoteDsId);
-
- }else {
- GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
- gateway.setMaximumQueueMemory(maxMemory);
- gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManualStart);
- if (filter != null) {
- gateway.addGatewayEventFilter(filter);
- }
- gateway.setBatchConflationEnabled(isConflation);
- if(isPersistent) {
- gateway.setPersistenceEnabled(true);
- gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
- }
- else {
- DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
- gateway.setDiskStoreName(store.getName());
- }
- gateway.create(dsName, remoteDsId);
- }
- }
-
- public void startSender(String senderId){
- final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
- try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- sender.start();
- } finally {
- exln.remove();
- }
- }
-
- public void pauseSender(String senderId){
- final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
- try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- sender.pause();
- } finally {
- exln.remove();
- }
- }
-
- public int createAndStartReceiver(int locPort) {
- Properties props = getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort
- + "]");
-
- InternalDistributedSystem ds = getSystem(props);
- cache = CacheFactory.create(ds);
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- fact.setStartPort(port);
- fact.setEndPort(port);
- fact.setManualStart(true);
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- } catch (IOException e) {
- e.printStackTrace();
- fail("Test " + getName() + " failed to start GatewayReceiver");
- }
- return port;
- }
-
- public int createReceiver(int locPort) {
- Properties props = getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort
- + "]");
-
- InternalDistributedSystem ds = getSystem(props);
- cache = CacheFactory.create(ds);
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setStartPort(AvailablePort.AVAILABLE_PORTS_LOWER_BOUND);
- fact.setEndPort(AvailablePort.AVAILABLE_PORTS_UPPER_BOUND);
- fact.setManualStart(true);
- GatewayReceiver receiver = fact.create();
- return receiver.getPort();
- }
-
- public int createReceiverWithGroup(int locPort, String groups) {
- Properties props = getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort
- + "]");
- props.setProperty(GROUPS, groups);
-
- InternalDistributedSystem ds = getSystem(props);
- cache = CacheFactory.create(ds);
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setStartPort(AvailablePort.AVAILABLE_PORTS_LOWER_BOUND);
- fact.setEndPort(AvailablePort.AVAILABLE_PORTS_UPPER_BOUND);
- fact.setManualStart(true);
- GatewayReceiver receiver = fact.create();
- return receiver.getPort();
-
- }
-
- public void startReceiver() {
- try {
- Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
- for (GatewayReceiver receiver : receivers) {
- receiver.start();
- }
- } catch (IOException e) {
- e.printStackTrace();
- fail("Test " + getName() + " failed to start GatewayReceiver");
- }
- }
-
- public void stopReceiver() {
- Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
- for (GatewayReceiver receiver : receivers) {
- receiver.stop();
- }
- }
-
- public int createAndStartReceiverWithGroup(int locPort, String groups) {
- Properties props = getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort
- + "]");
- props.setProperty(GROUPS, groups);
-
- InternalDistributedSystem ds = getSystem(props);
- cache = CacheFactory.create(ds);
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- fact.setStartPort(port);
- fact.setEndPort(port);
- fact.setManualStart(true);
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- } catch (IOException e) {
- e.printStackTrace();
- fail("Test " + getName() + " failed to start GatewayReceiver on port " + port);
- }
- return port;
- }
-
- public DistributedMember getMember(){
- return cache.getDistributedSystem().getDistributedMember();
- }
-
-
- public int getLocatorPort(){
- return Locator.getLocators().get(0).getPort();
- }
-
- /**
- * Enable system property gemfire.disableManagement false in each VM.
- */
- public void enableManagement() {
- Invoke.invokeInEveryVM(new SerializableRunnable("Enable Management") {
- public void run() {
- System.setProperty(InternalDistributedSystem.DISABLE_MANAGEMENT_PROPERTY, "false");
- }
- });
-
- }
-
- public void verifySenderState(String senderId, boolean isRunning, boolean isPaused) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
- try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender) s;
- break;
- }
- }
-
- assertEquals(isRunning, sender.isRunning());
- assertEquals(isPaused, sender.isPaused());
- } finally {
- exln.remove();
- }
- }
-
- public void verifySenderAttributes(String senderId, int remoteDsID,
- boolean isParallel, boolean manualStart, int socketBufferSize,
- int socketReadTimeout, boolean enableBatchConflation, int batchSize,
- int batchTimeInterval, boolean enablePersistence,
- boolean diskSynchronous, int maxQueueMemory, int alertThreshold,
- int dispatcherThreads, OrderPolicy orderPolicy,
- List<String> expectedGatewayEventFilters,
- List<String> expectedGatewayTransportFilters) {
-
- Set<GatewaySender> senders = cache.getGatewaySenders();
- AbstractGatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = (AbstractGatewaySender)s;
- break;
- }
- }
- assertEquals("remoteDistributedSystemId", remoteDsID, sender
- .getRemoteDSId());
- assertEquals("isParallel", isParallel, sender.isParallel());
- assertEquals("manualStart", manualStart, sender.isManualStart());
- assertEquals("socketBufferSize", socketBufferSize, sender
- .getSocketBufferSize());
- assertEquals("socketReadTimeout", socketReadTimeout, sender
- .getSocketReadTimeout());
- assertEquals("enableBatchConflation", enableBatchConflation, sender
- .isBatchConflationEnabled());
- assertEquals("batchSize", batchSize, sender.getBatchSize());
- assertEquals("batchTimeInterval", batchTimeInterval, sender
- .getBatchTimeInterval());
- assertEquals("enablePersistence", enablePersistence, sender
- .isPersistenceEnabled());
- assertEquals("diskSynchronous", diskSynchronous, sender.isDiskSynchronous());
- assertEquals("maxQueueMemory", maxQueueMemory, sender
- .getMaximumQueueMemory());
- assertEquals("alertThreshold", alertThreshold, sender.getAlertThreshold());
- assertEquals("dispatcherThreads", dispatcherThreads, sender
- .getDispatcherThreads());
- assertEquals("orderPolicy", orderPolicy, sender.getOrderPolicy());
-
- // verify GatewayEventFilters
- if (expectedGatewayEventFilters != null) {
- assertEquals("gatewayEventFilters", expectedGatewayEventFilters.size(),
- sender.getGatewayEventFilters().size());
-
- List<GatewayEventFilter> actualGatewayEventFilters = sender
- .getGatewayEventFilters();
- List<String> actualEventFilterClassNames = new ArrayList<String>(
- actualGatewayEventFilters.size());
- for (GatewayEventFilter filter : actualGatewayEventFilters) {
- actualEventFilterClassNames.add(filter.getClass().getName());
- }
-
- for (String expectedGatewayEventFilter : expectedGatewayEventFilters) {
- if (!actualEventFilterClassNames.contains(expectedGatewayEventFilter)) {
- fail("GatewayEventFilter " + expectedGatewayEventFilter
- + " is not added to the GatewaySender");
- }
- }
- }
-
- // verify GatewayTransportFilters
- if (expectedGatewayTransportFilters != null) {
- assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters
- .size(), sender.getGatewayTransportFilters().size());
- List<GatewayTransportFilter> actualGatewayTransportFilters = sender
- .getGatewayTransportFilters();
- List<String> actualTransportFilterClassNames = new ArrayList<String>(
- actualGatewayTransportFilters.size());
- for (GatewayTransportFilter filter : actualGatewayTransportFilters) {
- actualTransportFilterClassNames.add(filter.getClass().getName());
- }
-
- for (String expectedGatewayTransportFilter : expectedGatewayTransportFilters) {
- if (!actualTransportFilterClassNames
- .contains(expectedGatewayTransportFilter)) {
- fail("GatewayTransportFilter " + expectedGatewayTransportFilter
- + " is not added to the GatewaySender.");
- }
- }
- }
- }
-
- public void verifyReceiverState(boolean isRunning) {
- Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
- for (GatewayReceiver receiver : receivers) {
- assertEquals(isRunning, receiver.isRunning());
- }
- }
-
- public void verifyReceiverCreationWithAttributes(boolean isRunning,
- int startPort, int endPort, String bindAddress, int maxTimeBetweenPings,
- int socketBufferSize, List<String> expectedGatewayTransportFilters) {
-
- Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
- assertEquals("Number of receivers is incorrect", 1, receivers.size());
- for (GatewayReceiver receiver : receivers) {
- assertEquals("isRunning", isRunning, receiver.isRunning());
- assertEquals("startPort", startPort, receiver.getStartPort());
- assertEquals("endPort", endPort, receiver.getEndPort());
- assertEquals("bindAddress", bindAddress, receiver.getBindAddress());
- assertEquals("maximumTimeBetweenPings", maxTimeBetweenPings, receiver
- .getMaximumTimeBetweenPings());
- assertEquals("socketBufferSize", socketBufferSize, receiver
- .getSocketBufferSize());
-
- // verify GatewayTransportFilters
- if (expectedGatewayTransportFilters != null) {
- assertEquals("gatewayTransportFilters", expectedGatewayTransportFilters
- .size(), receiver.getGatewayTransportFilters().size());
- List<GatewayTransportFilter> actualGatewayTransportFilters = receiver
- .getGatewayTransportFilters();
- List<String> actualTransportFilterClassNames = new ArrayList<String>(
- actualGatewayTransportFilters.size());
- for (GatewayTransportFilter filter : actualGatewayTransportFilters) {
- actualTransportFilterClassNames.add(filter.getClass().getName());
- }
-
- for (String expectedGatewayTransportFilter : expectedGatewayTransportFilters) {
- if (!actualTransportFilterClassNames
- .contains(expectedGatewayTransportFilter)) {
- fail("GatewayTransportFilter " + expectedGatewayTransportFilter
- + " is not added to the GatewayReceiver.");
- }
- }
- }
- }
- }
-
- @Override
- public final void postTearDownCacheTestCase() throws Exception {
- closeCacheAndDisconnect();
- vm0.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
- vm1.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
- vm2.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
- vm3.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
- vm4.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
- vm5.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
- vm6.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
- vm7.invoke(() -> WANCommandTestBase.closeCacheAndDisconnect());
- }
-
- public static void closeCacheAndDisconnect() {
- if (cache != null && !cache.isClosed()) {
- cache.close();
- cache.getDistributedSystem().disconnect();
- }
- }
-}