You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:22 UTC
[34/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
deleted file mode 100644
index 3ae9a01..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropagationDUnitTest.java
+++ /dev/null
@@ -1,1063 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.misc;
-
-import org.junit.Ignore;
-import org.junit.experimental.categories.Category;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.Assert;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.Wait;
-
-@Category(DistributedTest.class)
-public class ReplicatedRegion_ParallelWANPropagationDUnitTest extends WANTestBase{
-
- public ReplicatedRegion_ParallelWANPropagationDUnitTest() {
- super();
- // TODO Auto-generated constructor stub
- }
-
- final String expectedExceptions = null;
-
-
- /**
- *
- */
- private static final long serialVersionUID = 1L;
-
- @Test
- public void test_DR_PGS_1Nodes_Put_Receiver() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver());
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- createCacheInVMs(lnPort, vm4);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
- fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region");
- }
- catch (Exception e) {
- if (!e.getCause().getMessage()
- .contains("can not be used with replicated region")) {
- fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region");
- }
- }
- }
-
- /*1. Validate that parallelGatewaySenderId can be added to distributed region
- *Region distributed ack/noack + PGS
- *1. Find out the restrictions on totalNumBuckets on shadowPR
- *2. Find out the restrictions on redundancy on shadowPR
- *3. Find out the restrictions on localMaxMemory on shadowPR
- *4. Find out the best way user will specify PR attributes to PGS
- *5. Find out the restrictions on ordering.
- *6. put on region populates the queue
- *7. put on region reaches to remote site. Dispatcher works as expected
- *8. m1 and m2 has DR(ack/noack). put on DR from m1 populates queue on both m1 and m2. Validate that remote site got all the events
- *9. m1 and m2 has DR(ack/noack). create/put/destroy/operations populates the queue. Validate that remote site got correct events
- *10. m1 and m2 has DR(ack/noack). localDestroy is called on m1's DR. This locally destroys M1's shadowPr
- *11. m1 and m2 has DR(ack/noack). destroy is called on m1's DR. This destroys entire shadowPr on m1 and m2
- *12. m1 and m2 has DR(ack/noack). close Region is called on m1's DR. This locally destroys shadowPr on m1
- *13. m1 and m2 has DR(ack/noack). cache.close on m1'. This locally destroys shadowPr on m1
- *14. Validate HA scenario does not cause any event loss
- *15. PDX events of DR are propagated to remote sites
- *16. validate stats
- *17: PR and DR regions with same name.. Can this be created. If yes then how to differentiate these 2 different shadowPR.
- *18. test for redundancy. FOR SPR's redundancy will be equal to the number of nodes where DR is present. Max is 3. I know this needs to be figure it out at runtime.
- *19. test without providing diskStoreName..I suspect some problem with this code. diskStoreName=null looks like this is not handled very well. need to verify
- *20. ParallelGatewaySenderQueue#addPR method has multiple check for inPersistenceEnabled. Can's we do it with only one check.
- */
-
- /**
- * Test to validate that created parallel gatewaySenders id can be added to
- * distributed region
- * Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0
- */
- @Ignore
- @Test
- public void test_PGS_Started_DR_CREATED_NO_RECEIVER() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class
- .getName(), vm4);
- ExpectedException exp2 = addExpectedException(InterruptedException.class
- .getName(), vm4);
- try {
-*/ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, false ));
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
- vm4.invoke(() -> WANTestBase.doPuts(
- getTestMethodName() + "_RR", 1000 ));
- vm4.invoke(() -> WANTestBase.validateQueueContents(
- "ln1", 1000 ));
-
-/* }
- finally {
- exp1.remove();
- exp2.remove();
- }
-*/ }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**
- * Test to validate that distributed region with given parallelGatewaySender id
- * is created first and then a same parallelGatewaySender is created
- * a single put in DR is enqueued in parallelQueue
- * Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0
- */
- @Ignore
- @Test
- public void test_DR_CREATED_PGS_STARTED_NO_RECEIVER() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
-/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class
- .getName(), vm4);
- ExpectedException exp2 = addExpectedException(InterruptedException.class
- .getName(), vm4);
- try {*/
- vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, false ));
- vm4.invoke(() -> WANTestBase.doPuts(
- getTestMethodName() + "_RR", 1000 ));
- vm4.invoke(() -> WANTestBase.validateQueueContents(
- "ln1", 1000 ));
-/* }
- finally {
- exp1.remove();
- exp2.remove();
- }
-*/ }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_1Node_Put_ValidateQueue_No_Receiver() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
-
-/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class
- .getName(), vm4);
- ExpectedException exp2 = addExpectedException(InterruptedException.class
- .getName(), vm4);
- try {*/
- vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true ));
- vm4.invoke(() -> WANTestBase.startSender( "ln1" ));
-
- vm4.invoke(() -> WANTestBase.doPuts(
- getTestMethodName() + "_RR", 10000 ));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 10000 ));
- vm4.invoke(() -> WANTestBase.validateQueueContents(
- "ln1", 10000 ));
-/* }
- finally {
- exp1.remove();
- exp2.remove();
- }
- */
- }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_2Nodes_Put_ValidateQueue_No_Receiver() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
-
-/* ExpectedException exp1 = addExpectedException(
- GatewaySenderException.class.getName());
- ExpectedException exp2 = addExpectedException(
- InterruptedException.class.getName());
- ExpectedException exp3 = addExpectedException(
- CacheClosedException.class.getName());
- try {
-*/ vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true ));
-
- startSenderInVMs("ln1", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts(
- getTestMethodName() + "_RR", 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents(
- "ln1", 1000 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents(
- "ln1", 1000 ));
-
-/* }
- finally {
- exp1.remove();
- exp2.remove();
- exp3.remove();
- }
-*/
- }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
-// public void test_DR_PGS_ORDERPOLICY_PARTITION_EXPECTException(){
-//
-// }
-// public void test_DR_PGS_DISKSTORE_NAME_PROVIDED_VALIDATE_DISK(){
-//
-// }
-// public void test_DR_PGS_DISKSTORE_NAME_NOT_PROVIDED_VALIDATE_DISK(){
-//
-// }
-//
-// public void test_DR_PGS_START_STOP_START(){
-//
-// }
-//
-// public void test_DR_PGS_PERSISTENCE_START_STOP_START(){
-//
-// }
-//
-// public void test_DR_PGS_START_PAUSE_STOP(){
-//
-// }
-//
-// public void test_DR_PGS_START_PAUSE_RESUME_VALIDATE_RECEIVER(){
-//
-// }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_1Nodes_Put_Receiver_2() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver());
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- createCacheInVMs(lnPort, vm4);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln1"));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000));
- }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_2Nodes_Put_Receiver() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- createCacheInVMs(lnPort, vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true));
- vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true));
-
- startSenderInVMs("ln1", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_2Nodes_EMPTY_Put_Receiver() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm2.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
- vm5.invoke(() -> WANTestBase.createCache( lnPort ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.EMPTY, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true));
- vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true));
-
- startSenderInVMs("ln1", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
-
-// vm4.invoke(() -> WANTestBase.validateRegionSize( testName + "_RR",
-// 1000 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PR_PGS_4Nodes_Put_Receiver_2Nodes() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- vm2.invoke(() -> WANTestBase.createReceiver());
- vm3.invoke(() -> WANTestBase.createReceiver());
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 10, 100, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 10, 100, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 10, 100, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 10, 100, false, false, null, true ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.doNextPuts( getTestMethodName() + "_PR",
- 1000, 2000 ));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_PR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
-
-/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
- .getName());
- try {*/
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 1000 ));
-/* }
- finally {
- exp1.remove();
- }
-*/ }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_NOMANUALSTART_4Nodes_Put_ValidateReceiver() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver());
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, false ));
- vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, false ));
- vm6.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, false ));
- vm7.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, false ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln1", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
- vm6.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
- vm7.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
-
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
- vm6.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
- vm7.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
-
-/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
- .getName());
- try {*/
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
-/* }
- finally {
- exp1.remove();
- }*/
- }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_4Nodes_Put_CLOSE4NODESCACHE_RECREATE_PUT_ValidateReceiver()
- throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- // before doing any puts, let the senders be running in order to ensure
- // that
- // not a single event will be lost
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
-
-/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
- .getName());
- try {*/
- vm4.invoke(() -> WANTestBase.killSender());
- vm5.invoke(() -> WANTestBase.killSender());
- vm6.invoke(() -> WANTestBase.killSender());
- vm7.invoke(() -> WANTestBase.killSender());
-/* }
- finally {
- exp1.remove();
- }*/
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2,
- true, 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- // ------------------------------------------------------------------------------------
-
- vm4.invoke(() -> WANTestBase.doNextPuts(
- getTestMethodName() + "_RR", 1000, 2000 ));
-
- // verify all buckets drained on all sender nodes.
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
- vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
-
-/* exp1 = addExpectedException(CacheClosedException.class.getName());
- try {*/
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 2000 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 2000 ));
-/* }
- finally {
- exp1.remove();
- }*/
- }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
-
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_NO_ACK_PGS_2Nodes_Put_ValidateQueue_Receiver() throws Exception {
- try {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- createCacheInVMs(lnPort, vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1",
- Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion( getTestMethodName() + "_RR", "ln1",
- Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true));
- vm5.invoke(() -> WANTestBase.createSender( "ln1", 2,
- true, 10, 100, false, false, null, true));
-
- startSenderInVMs("ln1", vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR",
- 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
- vm5.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName() + "_RR",
- 1000 ));
-
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
- vm5.invoke(() -> WANTestBase.validateQueueContents( "ln1",
- 0 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- }
- catch (Exception e) {
- Assert.fail("Unexpected exception", e);
- }
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_2NODES_1NODESDOWN_Validate_Receiver() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5);
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- Thread.sleep(60000);;
-
-/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
- .getName());
- try {*/
- AsyncInvocation inv1 = vm4.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0(
- getTestMethodName() + "_RR", 1000 ));
- Wait.pause(1000);
- AsyncInvocation inv2 = vm5.invokeAsync(() -> WANTestBase.killSender());
- try {
- inv1.join();
- inv2.join();
- }
- catch (Exception e) {
- Assert.fail("UnExpected Exception", e);
- }
-/* }
- finally {
- exp1.remove();
- }*/
-
- Integer size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
- LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size);
-
-
- vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
-
- size = (Integer)vm4.invoke(() -> WANTestBase.getQueueContentSize( "ln" ));
- LogWriterUtils.getLogWriter().info("The size of the queue is in vm4 " + size);
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 1000 ));
- }
-
- /**Below test is disabled intentionally
- 1> In this release 8.0, for rolling upgrade support queue name is changed to old style
- 2>Common parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about
- ParallelGatewaySenderQueue#convertPathToName
- 3> We have to enabled it in next release
- 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0
- and version prior to 8.0*/
- @Ignore
- @Test
- public void test_DR_PGS_4NODES_2NODESDOWN_Validate_Receiver() throws Exception {
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(vm2, vm3);
-
- createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSender( "ln", 2, true,
- 100, 10, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- startSenderInVMs("ln", vm4, vm5, vm6, vm7);
-
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
-
- Thread.sleep(60000);
-/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
- .getName());
- try */{
- AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts0(
- getTestMethodName() + "_RR", 10000 ));
- Thread.sleep(1000);
- AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
- Thread.sleep(2000);
- AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropagationDUnitTest.doPuts1(
- getTestMethodName() + "_RR", 10000 ));
- Thread.sleep(1500);
- AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
- try {
- inv1.join();
- inv2.join();
- inv3.join();
- inv4.join();
- }
- catch (Exception e) {
- Assert.fail("UnExpected Exception", e);
- }
- }/*
- finally {
- exp1.remove();
- }*/
-
- vm6.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
- vm7.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained( "ln" ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 10000 ));
-
- }
-
- public static void doPuts0(String regionName, int numPuts) {
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
- .getName());
- try {
-
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (long i = 0; i < numPuts; i++) {
- LogWriterUtils.getLogWriter().info("Put : key : " + i);
- r.put(i, "0_" + i);
- }
- } finally {
- exp.remove();
- exp1.remove();
- }
- }
-
- public static void doPuts1(String regionName, int numPuts){
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
- .getName());
- try {
-
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (long i = 0; i < numPuts; i++) {
- LogWriterUtils.getLogWriter().info("Put : key : " + i);
- r.put(i, "1_" + i);
- }
- } finally {
- exp.remove();
- exp1.remove();
- }
- }
-
- public static void doPuts2(String regionName, int numPuts){
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
- .getName());
- try {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (long i = 0; i < numPuts; i++) {
- LogWriterUtils.getLogWriter().info("Put : key : " + i);
- r.put(i, "2_" + i);
- }
- } finally {
- exp.remove();
- exp1.remove();
- }
- }
-
- /**
- * Test to validate that put on DR with no ack on multiple nodes are propagated to parallelQueue on multiple nodes
- */
-
- /**
- * Test to validate that the single put in DR is propagated to remote site through parallelGatewaySender
- */
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
deleted file mode 100644
index 0ee78c5..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.misc;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
-import static com.gemstone.gemfire.test.dunit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Properties;
-import java.util.zip.Adler32;
-import java.util.zip.CheckedInputStream;
-import java.util.zip.CheckedOutputStream;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.DiskStore;
-import com.gemstone.gemfire.cache.DiskStoreFactory;
-import com.gemstone.gemfire.cache.wan.GatewayReceiver;
-import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
-import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
-import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.AvailablePortHelper;
-import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-@Category(DistributedTest.class)
-public class SenderWithTransportFilterDUnitTest extends WANTestBase {
-
- @Test
- public void testSerialSenderWithTransportFilter() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort ));
- vm2.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", null, isOffHeap() ));
-
- vm3.invoke(() -> WANTestBase.createCache( lnPort ));
-
- vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, false, 100,
- 1, false, false, true ));
-
- vm3.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR", "ln", isOffHeap() ));
-
- vm3.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_RR", 100 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 100 ));
- }
-
- @Test
- public void testParallelSenderWithTransportFilter() {
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm2.invoke(() -> SenderWithTransportFilterDUnitTest.createReceiverWithTransportFilters( nyPort ));
- vm2.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", null, 0, 10, isOffHeap() ));
-
- vm3.invoke(() -> WANTestBase.createCache( lnPort ));
-
- vm3.invoke(() -> SenderWithTransportFilterDUnitTest.createSenderWithTransportFilter( "ln", 2, true, 100,
- 1, false, false, true ));
-
- vm3.invoke(() -> WANTestBase.createPartitionedRegion(
- getTestMethodName() + "_PR", "ln", 0, 10, isOffHeap() ));
-
- vm3.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm3.invoke(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", 100 ));
-
- vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_PR", 100 ));
- }
-
- public static int createReceiverWithTransportFilters(int locPort) {
- WANTestBase test = new WANTestBase();
- Properties props = test.getDistributedSystemProperties();
- props.setProperty(MCAST_PORT, "0");
- props.setProperty(LOCATORS, "localhost[" + locPort
- + "]");
-
- InternalDistributedSystem ds = test.getSystem(props);
- cache = CacheFactory.create(ds);
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- fact.setStartPort(port);
- fact.setEndPort(port);
- ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
- transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
- if (!transportFilters.isEmpty()) {
- for (GatewayTransportFilter filter : transportFilters) {
- fact.addGatewayTransportFilter(filter);
- }
- }
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- }
- catch (IOException e) {
- fail("Test " + test.getName() + " failed to start GatewayReceiver on port " + port, e);
- }
- return port;
- }
-
- public static void createSenderWithTransportFilter(String dsName,
- int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize,
- boolean isConflation, boolean isPersistent, boolean isManualStart) {
- File persistentDirectory = new File(dsName + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- persistentDirectory.mkdir();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- File[] dirs1 = new File[] { persistentDirectory };
-
- if (isParallel) {
- GatewaySenderFactory gateway = cache
- .createGatewaySenderFactory();
- gateway.setParallel(true);
- gateway.setMaximumQueueMemory(maxMemory);
- gateway.setBatchSize(batchSize);
- ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
- ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
- transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
- if (!transportFilters.isEmpty()) {
- for (GatewayTransportFilter filter : transportFilters) {
- gateway.addGatewayTransportFilter(filter);
- }
- }
- if (isPersistent) {
- gateway.setPersistenceEnabled(true);
- gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
- .getName());
- }
- else {
- DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
- gateway.setDiskStoreName(store.getName());
- }
- gateway.setBatchConflationEnabled(isConflation);
- gateway.create(dsName, remoteDsId);
-
- }
- else {
- GatewaySenderFactory gateway = cache
- .createGatewaySenderFactory();
- gateway.setMaximumQueueMemory(maxMemory);
- gateway.setBatchSize(batchSize);
- gateway.setManualStart(isManualStart);
- ((InternalGatewaySenderFactory)gateway)
- .setLocatorDiscoveryCallback(new MyLocatorCallback());
- ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>();
- transportFilters.add(new CheckSumTransportFilter("CheckSumTransportFilter"));
- if (!transportFilters.isEmpty()) {
- for (GatewayTransportFilter filter : transportFilters) {
- gateway.addGatewayTransportFilter(filter);
- }
- }
- gateway.setBatchConflationEnabled(isConflation);
- if (isPersistent) {
- gateway.setPersistenceEnabled(true);
- gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
- .getName());
- }
- else {
- DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
- gateway.setDiskStoreName(store.getName());
- }
- gateway.create(dsName, remoteDsId);
- }
- }
-
- static class CheckSumTransportFilter implements GatewayTransportFilter {
-
- Adler32 checker = new Adler32();
-
- private String name;
-
- public CheckSumTransportFilter(String name){
- this.name = name;
- }
-
- @Override
- public String toString(){
- return this.name;
- }
-
- @Override
- public InputStream getInputStream(InputStream stream) {
- return new CheckedInputStream(stream, checker);
- }
-
- @Override
- public OutputStream getOutputStream(OutputStream stream) {
- return new CheckedOutputStream(stream, checker);
- }
-
- @Override
- public void close() {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
deleted file mode 100644
index 6e2581e..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.misc;
-
-import static org.junit.Assert.*;
-
-import java.util.Set;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.admin.AdminDistributedSystemFactory;
-import com.gemstone.gemfire.admin.AdminException;
-import com.gemstone.gemfire.admin.DistributedSystemConfig;
-import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.internal.cache.CacheObserverAdapter;
-import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.IgnoredException;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
-import com.gemstone.gemfire.test.dunit.Wait;
-import com.gemstone.gemfire.test.dunit.WaitCriterion;
-import com.gemstone.gemfire.test.junit.categories.DistributedTest;
-
-@Category(DistributedTest.class)
-public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase {
-
- private static final long MAX_WAIT = 70000;
-
- private static final int NUM_KEYS = 1000;
-
- public ShutdownAllPersistentGatewaySenderDUnitTest() {
- super();
- }
-
- @Override
- protected final void postSetUpWANTestBase() throws Exception {
- IgnoredException.addIgnoredException("Cache is being closed by ShutdownAll");
- }
-
- private static final long serialVersionUID = 1L;
-
- @Test
- public void testGatewaySender() throws Exception {
- IgnoredException.addIgnoredException("Cache is shutting down");
-
- Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
-
- Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
-
- vm2.invoke(() -> WANTestBase.createCache( nyPort ));
- vm3.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
- vm4.invoke(() -> WANTestBase.createCache( lnPort ));
-
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 400, false, false, null, true ));
-
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
-
- vm4.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
-
- // set the CacheObserver to block the ShutdownAll
- SerializableRunnable waitAtShutdownAll = new SerializableRunnable() {
- @Override
- public void run() {
- LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
- CacheObserverHolder.setInstance(new CacheObserverAdapter() {
- @Override
- public void beforeShutdownAll() {
- final Region region = cache.getRegion(getTestMethodName() + "_PR");
- Wait.waitForCriterion(new WaitCriterion() {
- @Override
- public boolean done() {
- return region.size() >= 2;
- }
-
- @Override
- public String description() {
- return "Wait for wan to have processed several events";
- }
- }, 30000, 100, true);
- }
- });
- }
- };
- vm2.invoke(waitAtShutdownAll);
- vm3.invoke(waitAtShutdownAll);
-
- AsyncInvocation vm4_future = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_PR", NUM_KEYS ));
-
- // ShutdownAll will be suspended at observer, so puts will continue
- AsyncInvocation future = shutDownAllMembers(vm2, 2, MAX_WAIT);
- future.join(MAX_WAIT);
-
- // now restart vm1 with gatewayHub
- LogWriterUtils.getLogWriter().info("restart in VM2");
- vm2.invoke(() -> WANTestBase.createCache( nyPort ));
- vm3.invoke(() -> WANTestBase.createCache( nyPort ));
- AsyncInvocation vm3_future = vm3.invokeAsync(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR",
- "ln", 1, 100, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
- vm3_future.join(MAX_WAIT);
-
- vm3.invoke(new SerializableRunnable() {
- public void run() {
- final Region region = cache.getRegion(getTestMethodName() + "_PR");
- cache.getLogger().info(
- "vm1's region size before restart gatewayHub is " + region.size());
- }
- });
- vm2.invoke(() -> WANTestBase.createReceiver());
-
- // wait for vm0 to finish its work
- vm4_future.join(MAX_WAIT);
- vm4.invoke(new SerializableRunnable() {
- public void run() {
- Region region = cache.getRegion(getTestMethodName() + "_PR");
- assertEquals(NUM_KEYS, region.size());
- }
- });
-
- // verify the other side (vm1)'s entries received from gateway
- vm2.invoke(new SerializableRunnable() {
- public void run() {
- final Region region = cache.getRegion(getTestMethodName() + "_PR");
-
- cache.getLogger().info(
- "vm1's region size after restart gatewayHub is " + region.size());
- Wait.waitForCriterion(new WaitCriterion() {
- public boolean done() {
- Object lastValue = region.get(NUM_KEYS - 1);
- if (lastValue != null && lastValue.equals(NUM_KEYS - 1)) {
- region.getCache().getLogger().info(
- "Last key has arrived, its value is " + lastValue
- + ", end of wait.");
- return true;
- }
- else
- return (region.size() == NUM_KEYS);
- }
-
- public String description() {
- return "Waiting for destination region to reach size: " + NUM_KEYS
- + ", current is " + region.size();
- }
- }, MAX_WAIT, 100, true);
- assertEquals(NUM_KEYS, region.size());
- }
- });
-
- }
-
- private AsyncInvocation shutDownAllMembers(VM vm, final int expectedNumber, final long timeout) {
- AsyncInvocation future = vm.invokeAsync(new SerializableRunnable("Shutdown all the members") {
-
- public void run() {
- DistributedSystemConfig config;
- AdminDistributedSystemImpl adminDS = null;
- try {
- config = AdminDistributedSystemFactory.defineDistributedSystem(cache
- .getDistributedSystem(), "");
- adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory
- .getDistributedSystem(config);
- adminDS.connect();
- Set members = adminDS.shutDownAllMembers(timeout);
- int num = members == null ? 0 : members.size();
- assertEquals(expectedNumber, num);
- }
- catch (AdminException e) {
- throw new RuntimeException(e);
- }
- finally {
- if (adminDS != null) {
- adminDS.disconnect();
- }
- }
- }
- });
- return future;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANConfigurationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANConfigurationJUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANConfigurationJUnitTest.java
deleted file mode 100644
index b504f87..0000000
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANConfigurationJUnitTest.java
+++ /dev/null
@@ -1,601 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.internal.cache.wan.misc;
-
-import com.gemstone.gemfire.cache.*;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
-import com.gemstone.gemfire.cache.wan.*;
-import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
-import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
-import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
-import com.gemstone.gemfire.internal.AvailablePortHelper;
-import com.gemstone.gemfire.internal.cache.LocalRegion;
-import com.gemstone.gemfire.internal.cache.wan.GatewayReceiverException;
-import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
-import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
-import com.gemstone.gemfire.internal.cache.wan.MyGatewaySenderEventListener;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import static com.gemstone.gemfire.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.junit.Assert.*;
-
-@Category(IntegrationTest.class)
-public class WANConfigurationJUnitTest {
-
- private Cache cache;
-
- /**
- * Test to validate that the sender can not be started without configuring
- * locator
- * @throws IOException
- *
- * @throws IOException
- */
- @Test
- public void test_GatewaySender_without_Locator() throws IOException {
- try {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- fact.setParallel(true);
- GatewaySender sender1 = fact.create("NYSender", 2);
- sender1.start();
- fail("Expected IllegalStateException but not thrown");
- }
- catch (Exception e) {
- if ((e instanceof IllegalStateException && e
- .getMessage()
- .startsWith(
- LocalizedStrings.AbstractGatewaySender_LOCATOR_SHOULD_BE_CONFIGURED_BEFORE_STARTING_GATEWAY_SENDER
- .toLocalizedString()))) {
- }
- else {
- fail("Expected IllegalStateException but received :" + e);
- }
- }
- }
-
- /**
- * Test to validate that sender with same Id can not be added to cache.
- */
- @Test
- public void test_SameGatewaySenderCreatedTwice() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- try {
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- fact.setParallel(true);
- fact.setManualStart(true);
- fact.create("NYSender", 2);
- fact.create("NYSender", 2);
- fail("Expected IllegalStateException but not thrown");
- }
- catch (Exception e) {
- if (e instanceof IllegalStateException
- && e.getMessage().contains("A GatewaySender with id")) {
-
- }
- else {
- fail("Expected IllegalStateException but received :" + e);
- }
- }
- }
-
- /**
- * Test to validate that same gatewaySender Id can not be added to the region attributes.
- */
- @Test
- public void test_SameGatewaySenderIdAddedTwice() {
- try {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- fact.setParallel(true);
- fact.setManualStart(true);
- GatewaySender sender1 = fact.create("NYSender", 2);
- AttributesFactory factory = new AttributesFactory();
- factory.addGatewaySenderId(sender1.getId());
- factory.addGatewaySenderId(sender1.getId());
- fail("Expected IllegalArgumentException but not thrown");
- }
- catch (Exception e) {
- if (e instanceof IllegalArgumentException
- && e.getMessage().contains("is already added")) {
-
- }
- else {
- fail("Expected IllegalStateException but received :" + e);
- }
- }
- }
-
- @Test
- public void test_GatewaySenderIdAndAsyncEventId() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- AttributesFactory factory = new AttributesFactory();
- factory.addGatewaySenderId("ln");
- factory.addGatewaySenderId("ny");
- factory.addAsyncEventQueueId("Async_LN");
- RegionAttributes attrs = factory.create();
-
- Set<String> senderIds = new HashSet<String>();
- senderIds.add("ln");
- senderIds.add("ny");
- Set<String> attrsSenderIds = attrs.getGatewaySenderIds();
- assertEquals(senderIds, attrsSenderIds);
- Region r = cache.createRegion("Customer", attrs);
- assertEquals(senderIds, ((LocalRegion)r).getGatewaySenderIds());
- }
-
- /**
- * Test to validate that distributed region can not have the gateway sender
- * with parallel distribution policy
- *
- */
- @Ignore("Bug51491")
- @Test
- public void test_GatewaySender_Parallel_DistributedRegion() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- fact.setParallel(true);
- fact.setManualStart(true);
- GatewaySender sender1 = fact.create("NYSender", 2);
- AttributesFactory factory = new AttributesFactory();
- factory.addGatewaySenderId(sender1.getId());
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setDataPolicy(DataPolicy.REPLICATE);
- try {
- RegionFactory regionFactory = cache.createRegionFactory(factory.create());
- Region region = regionFactory
- .create("test_GatewaySender_Parallel_DistributedRegion");
- }
- catch (Exception e) {
- fail("Unexpected Exception :" + e);
- }
- }
-
- @Test
- public void test_GatewaySender_Parallel_MultipleDispatcherThread() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- fact.setParallel(true);
- fact.setManualStart(true);
- fact.setDispatcherThreads(4);
- try {
- GatewaySender sender1 = fact.create("NYSender", 2);
- }
- catch (GatewaySenderException e) {
- fail("UnExpected Exception " + e);
- }
- }
-
- @Test
- public void test_GatewaySender_Serial_ZERO_DispatcherThread() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- fact.setManualStart(true);
- fact.setDispatcherThreads(0);
- try {
- GatewaySender sender1 = fact.create("NYSender", 2);
- fail("Expected GatewaySenderException but not thrown");
- }
- catch (GatewaySenderException e) {
- if (e.getMessage().contains("can not be created with dispatcher threads less than 1")) {
- }
- else {
- fail("Expected IllegalStateException but received :" + e);
- }
- }
- }
-
- /**
- * Test to validate the gateway receiver attributes are correctly set
- */
- @Test
- public void test_ValidateGatewayReceiverAttributes() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
- int port1 = randomAvailableTCPPorts[0];
- int port2 = randomAvailableTCPPorts[1];
-
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- if(port1 < port2){
- fact.setStartPort(port1);
- fact.setEndPort(port2);
- }else{
- fact.setStartPort(port2);
- fact.setEndPort(port1);
- }
-
- fact.setMaximumTimeBetweenPings(2000);
- fact.setSocketBufferSize(200);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
- fact.addGatewayTransportFilter(myStreamFilter2);
- fact.addGatewayTransportFilter(myStreamFilter1);
- GatewayReceiver receiver1 = fact.create();
-
-
- Region region = cache.createRegionFactory().create(
- "test_ValidateGatewayReceiverAttributes");
- Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
- GatewayReceiver rec = receivers.iterator().next();
- assertEquals(receiver1.getHost(), rec.getHost());
- assertEquals(receiver1.getStartPort(), rec.getStartPort());
- assertEquals(receiver1.getEndPort(), rec.getEndPort());
- assertEquals(receiver1.getBindAddress(), rec.getBindAddress());
- assertEquals(receiver1.getMaximumTimeBetweenPings(), rec
- .getMaximumTimeBetweenPings());
- assertEquals(receiver1.getSocketBufferSize(), rec
- .getSocketBufferSize());
- assertEquals(receiver1.getGatewayTransportFilters().size(), rec
- .getGatewayTransportFilters().size());
-
- }
-
- @Test
- public void test_ValidateGatewayReceiverStatus() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
- int port1 = randomAvailableTCPPorts[0];
- int port2 = randomAvailableTCPPorts[1];
-
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- if(port1 < port2){
- fact.setStartPort(port1);
- fact.setEndPort(port2);
- }else{
- fact.setStartPort(port2);
- fact.setEndPort(port1);
- }
-
- fact.setMaximumTimeBetweenPings(2000);
- fact.setSocketBufferSize(200);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
- fact.addGatewayTransportFilter(myStreamFilter2);
- fact.addGatewayTransportFilter(myStreamFilter1);
- GatewayReceiver receiver1 = fact.create();
- assertTrue(receiver1.isRunning());
- }
-
- /**
- * Test to validate that serial gateway sender attributes are correctly set
- */
- @Test
- public void test_ValidateSerialGatewaySenderAttributes() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- fact.setManualStart(true);
- fact.setBatchConflationEnabled(true);
- fact.setBatchSize(200);
- fact.setBatchTimeInterval(300);
- fact.setPersistenceEnabled(false);
- fact.setDiskStoreName("FORNY");
- fact.setMaximumQueueMemory(200);
- fact.setAlertThreshold(1200);
- GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
- fact.addGatewayEventFilter(myEventFilter1);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- fact.addGatewayTransportFilter(myStreamFilter1);
- GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
- fact.addGatewayTransportFilter(myStreamFilter2);
- GatewaySender sender1 = fact.create("TKSender", 2);
-
-
- AttributesFactory factory = new AttributesFactory();
- factory.addGatewaySenderId(sender1.getId());
- factory.setDataPolicy(DataPolicy.PARTITION);
- Region region = cache.createRegionFactory(factory.create()).create(
- "test_ValidateGatewaySenderAttributes");
- Set<GatewaySender> senders = cache.getGatewaySenders();
- assertEquals(senders.size(), 1);
- GatewaySender gatewaySender = senders.iterator().next();
- assertEquals(sender1.getRemoteDSId(), gatewaySender
- .getRemoteDSId());
- assertEquals(sender1.isManualStart(), gatewaySender.isManualStart());
- assertEquals(sender1.isBatchConflationEnabled(), gatewaySender
- .isBatchConflationEnabled());
- assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
- assertEquals(sender1.getBatchTimeInterval(), gatewaySender
- .getBatchTimeInterval());
- assertEquals(sender1.isPersistenceEnabled(), gatewaySender
- .isPersistenceEnabled());
- assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
- assertEquals(sender1.getMaximumQueueMemory(), gatewaySender
- .getMaximumQueueMemory());
- assertEquals(sender1.getAlertThreshold(), gatewaySender
- .getAlertThreshold());
- assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender
- .getGatewayEventFilters().size());
- assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender
- .getGatewayTransportFilters().size());
-
- }
-
- /**
- * Test to validate that parallel gateway sender attributes are correctly set
- */
- @Test
- public void test_ValidateParallelGatewaySenderAttributes() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- fact.setParallel(true);
- fact.setManualStart(true);
- fact.setBatchConflationEnabled(true);
- fact.setBatchSize(200);
- fact.setBatchTimeInterval(300);
- fact.setPersistenceEnabled(false);
- fact.setDiskStoreName("FORNY");
- fact.setMaximumQueueMemory(200);
- fact.setAlertThreshold(1200);
- GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
- fact.addGatewayEventFilter(myEventFilter1);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- fact.addGatewayTransportFilter(myStreamFilter1);
- GatewayTransportFilter myStreamFilter2 = new MyGatewayTransportFilter2();
- fact.addGatewayTransportFilter(myStreamFilter2);
- GatewaySender sender1 = fact.create("TKSender", 2);
-
-
- AttributesFactory factory = new AttributesFactory();
- factory.addGatewaySenderId(sender1.getId());
- factory.setDataPolicy(DataPolicy.PARTITION);
- Region region = cache.createRegionFactory(factory.create()).create(
- "test_ValidateGatewaySenderAttributes");
- Set<GatewaySender> senders = cache.getGatewaySenders();
- assertEquals(1, senders.size());
- GatewaySender gatewaySender = senders.iterator().next();
- assertEquals(sender1.getRemoteDSId(), gatewaySender
- .getRemoteDSId());
- assertEquals(sender1.isManualStart(), gatewaySender.isManualStart());
- assertEquals(sender1.isBatchConflationEnabled(), gatewaySender
- .isBatchConflationEnabled());
- assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
- assertEquals(sender1.getBatchTimeInterval(), gatewaySender
- .getBatchTimeInterval());
- assertEquals(sender1.isPersistenceEnabled(), gatewaySender
- .isPersistenceEnabled());
- assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
- assertEquals(sender1.getMaximumQueueMemory(), gatewaySender
- .getMaximumQueueMemory());
- assertEquals(sender1.getAlertThreshold(), gatewaySender
- .getAlertThreshold());
- assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender
- .getGatewayEventFilters().size());
- assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender
- .getGatewayTransportFilters().size());
-
- }
-
- @Test
- public void test_GatewaySenderWithGatewaySenderEventListener1() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- InternalGatewaySenderFactory fact = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory();
- AsyncEventListener listener = new MyGatewaySenderEventListener();
- ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener);
- try {
- fact.create("ln", 2);
- fail("Expected GatewaySenderException. When a sender is added , remoteDSId should not be provided.");
- } catch (Exception e) {
- if (e instanceof GatewaySenderException
- && e.getMessage()
- .contains(
- "cannot define a remote site because at least AsyncEventListener is already added.")) {
-
- } else {
- fail("Expected GatewaySenderException but received :" + e);
- }
- }
- }
-
- @Test
- public void test_GatewaySenderWithGatewaySenderEventListener2() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewaySenderFactory fact = cache.createGatewaySenderFactory();
- AsyncEventListener listener = new MyGatewaySenderEventListener();
- ((InternalGatewaySenderFactory)fact).addAsyncEventListener(listener);
- try {
- ((InternalGatewaySenderFactory)fact).create("ln");
- } catch (Exception e) {
- fail("Received Exception :" + e);
- }
- }
-
- @Test
- public void test_ValidateGatewayReceiverAttributes_2() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setStartPort(50504);
- fact.setMaximumTimeBetweenPings(1000);
- fact.setSocketBufferSize(4000);
- fact.setEndPort(70707);
- fact.setManualStart(true);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- fact.addGatewayTransportFilter(myStreamFilter1);
-
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- }
- catch (IOException e) {
- fail("The test failed with IOException");
- }
-
- assertEquals(50504, receiver.getStartPort());
- assertEquals(1000, receiver.getMaximumTimeBetweenPings());
- assertEquals(4000,receiver.getSocketBufferSize());
- assertEquals(70707, receiver.getEndPort());
- }
-
- /**
- * This test takes a minimum of 120s to execute. It is known to hang on Mac OS
- * X Yosemite do to changes in the the message string checked in
- * GatewayReceiverImpl around line 167. Expects
- * "Cannot assign requested address" but gets
- * "Can't assign requested address". Timeout after 150s to safeguard against
- * hanging on other platforms that may differ.
- */
- @Test(timeout = 150000)
- public void test_ValidateGatewayReceiverAttributes_WrongBindAddress() {
- if (System.getProperty("os.name").equals("Mac OS X")) {
- fail("Failing to avoid known hang on Mac OS X.");
- }
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setStartPort(50504);
- fact.setMaximumTimeBetweenPings(1000);
- fact.setSocketBufferSize(4000);
- fact.setEndPort(70707);
- fact.setManualStart(true);
- fact.setBindAddress("200.112.204.10");
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- fact.addGatewayTransportFilter(myStreamFilter1);
-
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- fail("Expected GatewayReceiverException");
- }
- catch (GatewayReceiverException gRE){
- assertTrue(gRE.getMessage().contains("Failed to create server socket on"));
- }
- catch (IOException e) {
- e.printStackTrace();
- fail("The test failed with IOException");
- }
- }
-
- @Test
- public void test_ValidateGatewayReceiverDefaultStartPortAndDefaultEndPort() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setMaximumTimeBetweenPings(1000);
- fact.setSocketBufferSize(4000);
- fact.setManualStart(true);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- fact.addGatewayTransportFilter(myStreamFilter1);
-
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- }
- catch (IOException e) {
- fail("The test failed with IOException");
- }
- int port = receiver.getPort();
- if((port < 5000) || (port > 5500)) {
- fail("GatewayReceiver started on out of range port");
- }
- }
-
- @Test
- public void test_ValidateGatewayReceiverDefaultStartPortAndEndPortProvided() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setMaximumTimeBetweenPings(1000);
- fact.setSocketBufferSize(4000);
- fact.setEndPort(50707);
- fact.setManualStart(true);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- fact.addGatewayTransportFilter(myStreamFilter1);
-
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- }
- catch (IOException e) {
- fail("The test failed with IOException");
- }
- int port = receiver.getPort();
- if((port < GatewayReceiver.DEFAULT_START_PORT) || (port > 50707)) {
- fail("GatewayReceiver started on out of range port");
- }
- }
-
- @Test
- public void test_ValidateGatewayReceiverWithManualStartFALSE() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setMaximumTimeBetweenPings(1000);
- fact.setSocketBufferSize(4000);
- fact.setStartPort(5303);
- fact.setManualStart(false);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- fact.addGatewayTransportFilter(myStreamFilter1);
- GatewayReceiver receiver = fact.create();
- int port = receiver.getPort();
- if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) {
- fail("GatewayReceiver started on out of range port");
- }
- }
-
- @Test
- public void test_ValidateGatewayReceiverWithStartPortAndDefaultEndPort() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setMaximumTimeBetweenPings(1000);
- fact.setSocketBufferSize(4000);
- fact.setStartPort(5303);
- fact.setManualStart(true);
- GatewayTransportFilter myStreamFilter1 = new MyGatewayTransportFilter1();
- fact.addGatewayTransportFilter(myStreamFilter1);
-
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- }
- catch (IOException e) {
- fail("The test failed with IOException");
- }
- int port = receiver.getPort();
- if ((port < 5303) || (port > GatewayReceiver.DEFAULT_END_PORT)) {
- fail("GatewayReceiver started on out of range port");
- }
- }
-
- @Test
- public void test_ValidateGatewayReceiverWithWrongEndPortProvided() {
- cache = new CacheFactory().set(MCAST_PORT, "0").create();
- try {
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- fact.setMaximumTimeBetweenPings(1000);
- fact.setSocketBufferSize(4000);
- fact.setEndPort(4999);
- GatewayReceiver receiver = fact.create();
- fail("wrong end port set in the GatewayReceiver");
- } catch (IllegalStateException expected) {
- if(!expected.getMessage().contains("Please specify either start port a value which is less than end port.")){
- fail("Caught IllegalStateException");
- expected.printStackTrace();
- }
- }
- }
-
- @After
- public void tearDown() throws Exception {
- if (this.cache != null) {
- this.cache.close();
- }
- }
-}