You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/04/18 19:40:37 UTC
[1/5] incubator-geode git commit: GEODE-1032 : Additional wait time
to check for empty queue,
refactored WANTestBase.java to remove unused functions,
replaced wait criterions with awaitility.
Repository: incubator-geode
Updated Branches:
refs/heads/develop 8f7f4d5f4 -> bcaf0c699
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
index 7adba41..35cdb36 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
@@ -16,11 +16,6 @@
*/
package com.gemstone.gemfire.internal.cache.wan.parallel;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
import com.gemstone.gemfire.test.dunit.Wait;
@@ -43,9 +38,9 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
//create receiver on site1 and site2
createCacheInVMs(lnPort, vm2, vm4, vm5);
- vm2.invoke(() -> WANTestBase.createReceiver( lnPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(nyPort, vm3, vm6, vm7);
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
//create senders on site1
vm2.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -157,11 +152,11 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
//create cache and receivers on all the 3 sites
createCacheInVMs(lnPort, vm3, vm6);
- createReceiverInVMs(lnPort, vm3, vm6);
+ createReceiverInVMs(vm3, vm6);
createCacheInVMs(nyPort, vm4, vm7);
- createReceiverInVMs(nyPort, vm4, vm7);
+ createReceiverInVMs(vm4, vm7);
createCacheInVMs(tkPort, vm5);
- createReceiverInVMs(tkPort, vm5);
+ createReceiverInVMs(vm5);
//create senders on all the 3 sites
@@ -262,9 +257,9 @@ public class ParallelWANPropagationLoopBackDUnitTest extends WANTestBase {
createCacheInVMs(lnPort, vm3, vm6);
createCacheInVMs(nyPort, vm4, vm7);
createCacheInVMs(tkPort, vm5);
- vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
- vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
+ vm4.invoke(() -> WANTestBase.createReceiver());
+ vm5.invoke(() -> WANTestBase.createReceiver());
//site1
vm3.invoke(() -> WANTestBase.createSender( "ln1", 2,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
index 9d9c074..2a5801a 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANStatsDUnitTest.java
@@ -48,7 +48,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSendersWithConflation(lnPort);
@@ -80,7 +80,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createSenders(lnPort);
@@ -117,7 +117,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createSenders(lnPort);
@@ -155,8 +155,8 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
createCacheInVMs(nyPort, vm2);
createCacheInVMs(tkPort, vm3);
- createReceiverInVMs(nyPort, vm2);
- createReceiverInVMs(tkPort, vm3);
+ createReceiverInVMs(vm2);
+ createReceiverInVMs(vm3);
vm4.invoke(() -> WANTestBase.createCache(lnPort ));
@@ -211,7 +211,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createSenders(lnPort);
@@ -260,7 +260,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createSenders(lnPort);
@@ -296,7 +296,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -345,7 +345,7 @@ public class ParallelWANStatsDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createSendersWithConflation(lnPort);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
index eb75afa..c09bc60 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderDistributedDeadlockDUnitTest.java
@@ -59,8 +59,8 @@ public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase
//exercise region and gateway operations with different messaging
exerciseWANOperations();
- AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR", 100));
- AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR", 100));
+ AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+ AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
@@ -133,8 +133,8 @@ public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase
//exercise region and gateway operations with messaging
exerciseWANOperations();
- AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR", 100));
- AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR", 100));
+ AsyncInvocation invVM4transaction = vm4.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
+ AsyncInvocation invVM5transaction = vm5.invokeAsync(() -> WANTestBase.doTxPuts(getTestMethodName() + "_RR"));
AsyncInvocation invVM4 = vm4.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
AsyncInvocation invVM5 = vm5.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));
@@ -196,7 +196,7 @@ public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase
//create receiver
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, false));
- vm2.invoke(() -> WANTestBase.createReceiver(nyPort));
+ vm2.invoke(() -> WANTestBase.createReceiver());
//create senders
vm4.invoke(() -> WANTestBase.createReplicatedRegion(
@@ -235,7 +235,7 @@ public class SerialGatewaySenderDistributedDeadlockDUnitTest extends WANTestBase
vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_RR",
"", 0, 113, false));
- vm2.invoke(() -> WANTestBase.createReceiver(nyPort));
+ vm2.invoke(() -> WANTestBase.createReceiver());
//create sender vms
vm4.invoke(() -> WANTestBase.createPartitionedRegion(
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
index 7f69904..e7210ef 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderEventListenerDUnitTest.java
@@ -99,7 +99,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -155,7 +155,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -206,7 +206,7 @@ public class SerialGatewaySenderEventListenerDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index 6899101..9157b6c 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -19,8 +19,6 @@ package com.gemstone.gemfire.internal.cache.wan.serial;
import java.util.HashSet;
import java.util.Set;
-import org.junit.Ignore;
-
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionRequest;
import com.gemstone.gemfire.cache.client.internal.locator.QueueConnectionResponse;
@@ -29,7 +27,6 @@ import com.gemstone.gemfire.distributed.Locator;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.InternalLocator;
import com.gemstone.gemfire.distributed.internal.ServerLocator;
-import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
@@ -70,7 +67,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSenderCaches(lnPort);
@@ -132,7 +129,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSenderCaches(lnPort);
@@ -188,7 +185,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSenderCaches(lnPort);
@@ -273,7 +270,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSenderCaches(lnPort);
@@ -326,7 +323,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSenderCaches(lnPort);
@@ -364,7 +361,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4);
createSenderVM4();
@@ -427,7 +424,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
LogWriterUtils.getLogWriter().info("Completed puts in the region");
vm2.invoke(() -> WANTestBase.validateRegionSize(
@@ -453,7 +450,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSenderCaches(lnPort);
@@ -491,7 +488,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSenderCaches(lnPort);
@@ -527,7 +524,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createSenderCaches(lnPort);
@@ -558,7 +555,7 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
index f2de5de..fbd5700 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialGatewaySenderQueueDUnitTest.java
@@ -67,8 +67,8 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase{
vm3.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.createCache( lnPort ));
vm5.invoke(() -> WANTestBase.createCache( lnPort ));
@@ -148,7 +148,7 @@ public class SerialGatewaySenderQueueDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
index 9a818e0..c6513a1 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -44,7 +44,7 @@ public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -89,7 +89,7 @@ public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -128,7 +128,7 @@ public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -166,7 +166,7 @@ public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -262,7 +262,7 @@ public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -351,7 +351,7 @@ public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -450,7 +450,7 @@ public class SerialWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java
index 8cc50de..75d4de2 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropagationLoopBackDUnitTest.java
@@ -38,8 +38,8 @@ public class SerialWANPropagationLoopBackDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.createCache( lnPort ));
vm3.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( lnPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.createCache( lnPort ));
vm5.invoke(() -> WANTestBase.createCache( lnPort ));
@@ -131,9 +131,9 @@ public class SerialWANPropagationLoopBackDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm4, vm7);
createCacheInVMs(tkPort, vm5);
- vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
- vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
+ vm4.invoke(() -> WANTestBase.createReceiver());
+ vm5.invoke(() -> WANTestBase.createReceiver());
// using vm5 for sender in ds 3. cache is already created.
@@ -237,9 +237,9 @@ public class SerialWANPropagationLoopBackDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm4, vm7);
createCacheInVMs(tkPort, vm5);
- vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
- vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
+ vm4.invoke(() -> WANTestBase.createReceiver());
+ vm5.invoke(() -> WANTestBase.createReceiver());
// using vm5 for sender in ds 3. cache is already created.
@@ -380,9 +380,9 @@ public class SerialWANPropagationLoopBackDUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createCache( lnPort ));
vm4.invoke(() -> WANTestBase.createCache( nyPort ));
vm5.invoke(() -> WANTestBase.createCache( tkPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
- vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
+ vm4.invoke(() -> WANTestBase.createReceiver());
+ vm5.invoke(() -> WANTestBase.createReceiver());
vm3.invoke(() -> WANTestBase.createSender( "ln1", 2,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java
index e6be1d5..c279d62 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationDUnitTest.java
@@ -75,8 +75,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2, vm3);
- vm2.invoke(() -> WANTestBase.createReceiver(nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver(nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
@@ -131,8 +131,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver(nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver(nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
Thread.sleep(5000);
@@ -188,8 +188,8 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
@@ -208,7 +208,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -243,7 +243,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -322,7 +322,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
//these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -403,7 +403,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
//these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -471,7 +471,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
//these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -548,7 +548,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
// these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
// these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -651,8 +651,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
//these are part of remote site
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- //vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
//these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -709,7 +708,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
// these are part of remote site
vm2.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
// these are part of local site
createCacheInVMs(lnPort, vm4, vm5);
@@ -759,7 +758,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR_1", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
0 ));
@@ -774,7 +773,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
// these are part of remote site
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
// these are part of local site
createCacheInVMs(lnPort, vm4, vm5);
@@ -811,7 +810,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap()));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0));
@@ -827,7 +826,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
// these are part of remote site
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
// these are part of local site
createCacheInVMs(lnPort, vm4, vm5);
@@ -881,7 +880,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
getTestMethodName() + "_RR_1", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
0 ));
@@ -900,7 +899,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
// these are part of remote site
createCacheInVMs(nyPort1, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort1 ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
// these are part of local site
createCacheInVMs(lnPort, vm4, vm5);
@@ -947,7 +946,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
createCacheInVMs(nyPort2, vm6);
vm6.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR_1", null, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createReceiver( nyPort2 ));
+ vm6.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
@@ -963,32 +962,30 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
// these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
// these are part of local site
- createCacheInVMs(lnPort, vm4, vm5);
+ createCacheInVMs(lnPort, vm4);
// senders are created on local site. Batch size is kept to a high (170) so
// there will be less number of exceptions (occur during dispatchBatch) in
// the log
- vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 350, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSender( "ln", 2,
- false, 100, 350, false, false, null, true ));
+ vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 350, false, false, null, true ));
// create one RR (RR_1) on remote site
- vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
- getTestMethodName() + "_RR_1", null, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap()));
+ vm2.invoke(() -> WANTestBase.createReceiver());
- vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
- getTestMethodName() + "_RR_1", null, isOffHeap() ));
-
- // start the senders on local site
- vm4.invoke(() -> WANTestBase.startSender( "ln" ));
+ vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap()));
+
+ vm2.invoke(() -> addListenerToSleepAfterCreateEvent(2000));
+ vm3.invoke(() -> addListenerToSleepAfterCreateEvent(2000));
// create one RR (RR_1) on local site
- vm4.invoke(() -> WANTestBase.createReplicatedRegion(
- getTestMethodName() + "_RR_1", "ln", isOffHeap() ));
+ vm4.invoke(() -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR_1", "ln", isOffHeap()));
+ // start the senders on local site
+ vm4.invoke(() -> WANTestBase.startSender("ln"));
+
+
// start puts in RR_1 in another thread
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( getTestMethodName() + "_RR_1", 8000 ));
@@ -996,31 +993,22 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
// receivers.
vm2.invoke(() -> WANTestBase.closeCache());
vm3.invoke(() -> WANTestBase.closeCache());
-
- try {
- inv1.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail();
- }
+
+ inv1.join();
// verify that all is well in local site
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR_1", 8000 ));
+ vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000));
vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty( "ln" ));
createCacheInVMs(nyPort, vm3);
- vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion(
- getTestMethodName() + "_RR_1", null, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm3.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR_1", null, isOffHeap()));
+ vm3.invoke(() -> WANTestBase.createReceiver());
- vm4.invoke(() -> WANTestBase.validateQueueContents( "ln",
- 0 ));
+ vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0));
vm3.invoke(() -> WANTestBase.checkMinimumGatewayReceiverStats( 1, 1 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR_1", 8000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR_1", 8000));
}
public void testReplicatedSerialPropagationToTwoWanSites() throws Exception {
@@ -1030,9 +1018,9 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(tkPort, vm3);
- vm3.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1078,7 +1066,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1130,7 +1118,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3, vm6, vm7);
- createReceiverInVMs(nyPort, vm2, vm3, vm6, vm7);
+ createReceiverInVMs(vm2, vm3, vm6, vm7);
LogWriterUtils.getLogWriter().info("Started receivers on remote site");
@@ -1209,7 +1197,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3, vm6, vm7);
- createReceiverInVMs(nyPort, vm2, vm3, vm6, vm7);
+ createReceiverInVMs(vm2, vm3, vm6, vm7);
LogWriterUtils.getLogWriter().info("Started receivers on remote site");
@@ -1300,7 +1288,7 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5);
@@ -1353,10 +1341,10 @@ public class SerialWANPropogationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- createReceiverInVMs(lnPort, vm4, vm5);
+ createReceiverInVMs(vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java
index b6d0ee4..66d4279 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogation_PartitionedRegionDUnitTest.java
@@ -38,7 +38,7 @@ public class SerialWANPropogation_PartitionedRegionDUnitTest extends WANTestBase
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -78,7 +78,7 @@ public class SerialWANPropogation_PartitionedRegionDUnitTest extends WANTestBase
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -134,7 +134,7 @@ public class SerialWANPropogation_PartitionedRegionDUnitTest extends WANTestBase
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -192,7 +192,7 @@ public class SerialWANPropogation_PartitionedRegionDUnitTest extends WANTestBase
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -256,9 +256,9 @@ public class SerialWANPropogation_PartitionedRegionDUnitTest extends WANTestBase
Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(3,lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver(nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(tkPort, vm3);
- vm3.invoke(() -> WANTestBase.createReceiver(tkPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -307,7 +307,7 @@ public class SerialWANPropogation_PartitionedRegionDUnitTest extends WANTestBase
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -379,7 +379,7 @@ public class SerialWANPropogation_PartitionedRegionDUnitTest extends WANTestBase
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
vm4.invoke(() -> WANTestBase.doMultiThreadedPuts(
getTestMethodName() + "_PR", 1000 ));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java
index 6658be5..7e7a2f0 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANPropogationsFeatureDUnitTest.java
@@ -17,7 +17,6 @@
package com.gemstone.gemfire.internal.cache.wan.serial;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
@@ -34,7 +33,7 @@ public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -63,9 +62,9 @@ public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
getTestMethodName() + "_RR", 120 ));
vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 120 ));
+ getTestMethodName() + "_RR", 120, 240000 ));
vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 120 ));
+ getTestMethodName() + "_RR", 120, 240000 ));
}
public void testSerialReplicatedWanWithPersistence() {
@@ -74,7 +73,7 @@ public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -115,7 +114,7 @@ public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -156,7 +155,7 @@ public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -196,7 +195,7 @@ public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -235,7 +234,7 @@ public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -276,7 +275,7 @@ public class SerialWANPropogationsFeatureDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm6, vm7);
- createReceiverInVMs(nyPort, vm6, vm7);
+ createReceiverInVMs(vm6, vm7);
createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2, false, 100, 10, false, false,
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
index 0220459..1c5a4b6 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/serial/SerialWANStatsDUnitTest.java
@@ -21,11 +21,12 @@ import static com.gemstone.gemfire.test.dunit.IgnoredException.*;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.jayway.awaitility.Awaitility;
public class SerialWANStatsDUnitTest extends WANTestBase {
@@ -50,7 +51,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -99,7 +100,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -150,9 +151,9 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
Integer tkPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(tkPort, vm3);
- vm3.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -220,7 +221,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
vm2.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.createCache(lnPort ));
vm5.invoke(() -> WANTestBase.createCache(lnPort ));
@@ -278,7 +279,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
//these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -353,7 +354,13 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
/**
*
- * Disabled - see ticket #52118
+ * Not Disabled - see ticket #52118
+ *
+ * NOTE: The test failure is avoided by having a larger number of puts operation so
+ * that WANTestBase.verifyRegionQueueNotEmpty("ln" )) is sucessful as there is a
+ * significant delay during the high number of puts.
+ *
+ * In future if this failure reappears, the put operations must be increase or a better fix must be found.
*
* 1 region and sender configured on local site and 1 region and a
* receiver configured on remote site. Puts to the local region are in progress.
@@ -362,12 +369,13 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
* @throws Exception
*/
public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception {
+ int numEntries = 20000;
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
//these are part of remote site
vm2.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
//these are part of local site
vm4.invoke(() -> WANTestBase.createCache( lnPort ));
@@ -384,7 +392,8 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
//create one RR (RR_1) on remote site
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
testName + "_RR_1", null, isOffHeap() ));
-
+ //This is to cause a scenario where we have received at least X events and want to slow the receiver
+ vm2.invoke(() -> WANTestBase.longPauseAfterNumEvents(500, 200));
//start the senders on local site
startSenderInVMs("ln", vm4, vm5);
@@ -399,10 +408,10 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
testName + "_RR_1", "ln", isOffHeap() ));
//start puts in RR_1 in another thread
- AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( testName + "_RR_1", 20000 ));
+ AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.doPuts( testName + "_RR_1", numEntries ));
//destroy RR_1 in remote site
vm2.invoke(() -> WANTestBase.destroyRegion( testName + "_RR_1", 500));
-
+
try {
inv1.join();
} catch (InterruptedException e) {
@@ -410,18 +419,22 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
fail();
}
- //verify that all is well in local site. All the events should be present in local region
- vm4.invoke(() -> WANTestBase.validateRegionSize(
- testName + "_RR_1", 20000 ));
//assuming some events might have been dispatched before the remote region was destroyed,
//sender's region queue will have events less than 1000 but the queue will not be empty.
//NOTE: this much verification might be sufficient in DUnit. Hydra will take care of
//more in depth validations.
vm4.invoke(() -> WANTestBase.verifyRegionQueueNotEmpty("ln" ));
+
+ //verify that all is well in local site. All the events should be present in local region
+ vm4.invoke(() -> WANTestBase.validateRegionSize(
+ testName + "_RR_1", numEntries ));
+
+ //like a latch to guarantee at least one exception returned
+ vm4.invoke(() -> Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> WANTestBase.verifyQueueSize("ln", 0)));
vm4.invoke(() -> WANTestBase.checkBatchStats("ln", true, true));
- vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", 20000));
+ vm5.invoke(() -> WANTestBase.checkUnProcessedStats("ln", numEntries));
vm2.invoke(() -> WANTestBase.checkExcepitonStats(1));
@@ -433,7 +446,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -487,7 +500,7 @@ public class SerialWANStatsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/management/WANManagementDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/management/WANManagementDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/management/WANManagementDUnitTest.java
index 48aff88..af3dc97 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/management/WANManagementDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/management/WANManagementDUnitTest.java
@@ -81,7 +81,7 @@ public class WANManagementDUnitTest extends ManagementTestBase {
nyReceiver.invoke(() -> WANTestBase.createCache( nyPort ));
nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false ));
- nyReceiver.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ nyReceiver.invoke(() -> WANTestBase.createReceiver());
WANTestBase.startSenderInVMs("pn", puneSender, managing);
@@ -123,7 +123,7 @@ public class WANManagementDUnitTest extends ManagementTestBase {
nyReceiver.invoke(() -> WANTestBase.createCache( nyPort ));
nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false ));
- nyReceiver.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ nyReceiver.invoke(() -> WANTestBase.createReceiver());
// keep a larger batch to minimize number of exception occurrences in the
// log
@@ -185,7 +185,7 @@ public class WANManagementDUnitTest extends ManagementTestBase {
WANTestBase.createCacheInVMs(nyPort, nyReceiver);
nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false ));
- nyReceiver.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ nyReceiver.invoke(() -> WANTestBase.createReceiver());
checkAsyncQueueMBean(puneSender);
checkAsyncQueueMBean(managing);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/management/internal/pulse/TestRemoteClusterDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/management/internal/pulse/TestRemoteClusterDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/management/internal/pulse/TestRemoteClusterDUnitTest.java
index 1827fc1..2c01dac 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/management/internal/pulse/TestRemoteClusterDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/management/internal/pulse/TestRemoteClusterDUnitTest.java
@@ -84,7 +84,7 @@ public class TestRemoteClusterDUnitTest extends ManagementTestBase {
getTestMethodName() + "_PR", "pn", 1, 100, false ));
WANTestBase.createCacheInVMs(nyPort, nyReceiver);
- nyReceiver.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ nyReceiver.invoke(() -> WANTestBase.createReceiver());
nyReceiver.invoke(() -> WANTestBase.createPartitionedRegion( getTestMethodName() + "_PR", null, 1, 100, false ));
WANTestBase.startSenderInVMs("pn", puneSender, managing);
[4/5] incubator-geode git commit: GEODE-1032 : Additional wait time
to check for empty queue,
refactored WANTestBase.java to remove unused functions,
replaced wait criterions with awaitility.
Posted by ja...@apache.org.
GEODE-1032 : Additional wait time to check for empty queue, refactored WANTestBase.java to remove unused functions, replaced wait criterions with awaitility.
* Added additional wait time in awaitility to check for empty queue [WANTestBase.java]
* Replaced wait criterion with awaitility [WANTestBase.java]
* Removed used parameters from function signatures [WANTestBase.java]
* Added listener to put the thread to sleep on create event so that the transmission is slowed and sender queue is not empty when the cache is closed.[testReplicatedSerialPropagationWithRemoteReceiverRestartedOnOtherNode]
* Used the new function signature for validateRegionSize which passes time to wait as a parameter for test cases which wait for 240 seconds.
This closes #128
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/84a4b474
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/84a4b474
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/84a4b474
Branch: refs/heads/develop
Commit: 84a4b474c202e078aaef26487d90482915260835
Parents: 8f7f4d5
Author: nabarun <nn...@pivotal.io>
Authored: Mon Apr 4 14:31:02 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Mon Apr 18 10:25:40 2016 -0700
----------------------------------------------------------------------
.../tier/sockets/DurableClientTestCase.java | 1 -
.../cache/wan/CacheClientNotifierDUnitTest.java | 2 +-
.../gemfire/internal/cache/wan/WANTestBase.java | 1853 +++---------------
...oncurrentParallelGatewaySenderDUnitTest.java | 38 +-
...allelGatewaySenderOperation_1_DUnitTest.java | 20 +-
...allelGatewaySenderOperation_2_DUnitTest.java | 25 +-
.../ConcurrentWANPropogation_1_DUnitTest.java | 16 +-
.../ConcurrentWANPropogation_2_DUnitTest.java | 29 +-
.../cache/wan/disttx/DistTXWANDUnitTest.java | 10 +-
.../CommonParallelGatewaySenderDUnitTest.java | 8 +-
...wWANConcurrencyCheckForDestroyDUnitTest.java | 18 +-
.../cache/wan/misc/PDXNewWanDUnitTest.java | 31 +-
...dRegion_ParallelWANPersistenceDUnitTest.java | 19 +-
...dRegion_ParallelWANPropogationDUnitTest.java | 51 +-
...downAllPersistentGatewaySenderDUnitTest.java | 4 +-
.../cache/wan/misc/WANSSLDUnitTest.java | 2 +-
.../cache/wan/misc/WanValidationsDUnitTest.java | 19 +-
...arallelGatewaySenderOperationsDUnitTest.java | 12 +-
...llelGatewaySenderQueueOverflowDUnitTest.java | 24 +-
.../ParallelWANConflationDUnitTest.java | 20 +-
...ersistenceEnabledGatewaySenderDUnitTest.java | 28 +-
...lelWANPropagationConcurrentOpsDUnitTest.java | 6 +-
.../ParallelWANPropagationDUnitTest.java | 47 +-
...ParallelWANPropagationLoopBackDUnitTest.java | 21 +-
.../wan/parallel/ParallelWANStatsDUnitTest.java | 18 +-
...tewaySenderDistributedDeadlockDUnitTest.java | 12 +-
...rialGatewaySenderEventListenerDUnitTest.java | 6 +-
.../SerialGatewaySenderOperationsDUnitTest.java | 25 +-
.../SerialGatewaySenderQueueDUnitTest.java | 6 +-
...ersistenceEnabledGatewaySenderDUnitTest.java | 14 +-
.../SerialWANPropagationLoopBackDUnitTest.java | 22 +-
.../serial/SerialWANPropogationDUnitTest.java | 110 +-
...NPropogation_PartitionedRegionDUnitTest.java | 16 +-
.../SerialWANPropogationsFeatureDUnitTest.java | 19 +-
.../wan/serial/SerialWANStatsDUnitTest.java | 49 +-
.../management/WANManagementDUnitTest.java | 6 +-
.../pulse/TestRemoteClusterDUnitTest.java | 2 +-
37 files changed, 569 insertions(+), 2040 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
----------------------------------------------------------------------
diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
index 4d53146..563c9a7 100755
--- a/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
+++ b/geode-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
@@ -1475,7 +1475,6 @@ public class DurableClientTestCase extends DistributedTestCase {
CountDownLatch clientConnected = new CountDownLatch(1);
public void doTestHook(String spot) {
- System.out.println("JASON " + spot);
try {
if (spot.equals("CLIENT_PRE_RECONNECT")) {
if (!reconnectLatch.await(60, TimeUnit.SECONDS)) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
index 9557f0d..7406f2d 100755
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/CacheClientNotifierDUnitTest.java
@@ -178,7 +178,7 @@ public class CacheClientNotifierDUnitTest extends WANTestBase {
// create recever and cache servers will be at ny
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
vm1.invoke(() -> WANTestBase.createCache( nyPort ));
- int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ int receiverPort = vm1.invoke(() -> WANTestBase.createReceiver());
checkCacheServer(vm1, receiverPort, false, 0);
// create PR for receiver
[2/5] incubator-geode git commit: GEODE-1032 : Additional wait time
to check for empty queue,
refactored WANTestBase.java to remove unused functions,
replaced wait criterions with awaitility.
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
index 4669ac9..91f3c1f 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderDUnitTest.java
@@ -54,7 +54,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -134,7 +134,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -197,7 +197,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -289,7 +289,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//verify all buckets drained on all sender nodes.
vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
@@ -313,7 +313,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -327,22 +327,16 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
true, 100, 10, false, false, null, true, 5, OrderPolicy.PARTITION ));
- vm4.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null,
- "ln", 1, 100, isOffHeap() ));
- vm5.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null,
- "ln", 1, 100, isOffHeap() ));
- vm6.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null,
- "ln", 1, 100, isOffHeap() ));
- vm7.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null,
- "ln", 1, 100, isOffHeap() ));
+ vm4.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() ));
+ vm5.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() ));
+ vm6.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() ));
+ vm7.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() ));
startSenderInVMs("ln", vm4, vm5, vm6, vm7);
- vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null,
- "ln", 1, 100, isOffHeap() ));
- vm3.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null,
- "ln", 1, 100, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() ));
+ vm3.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap() ));
//before doing any puts, let the senders be running in order to ensure that
//not a single event will be lost
@@ -381,7 +375,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -491,7 +485,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -537,7 +531,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -578,7 +572,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -638,7 +632,7 @@ public class ConcurrentParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3, vm4);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
index ddcb3d6..400a289 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
@@ -45,7 +45,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -91,7 +91,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -143,7 +143,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -203,7 +203,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -280,7 +280,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(nyPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createCache( lnPort ));
@@ -342,7 +342,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -400,7 +400,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -496,7 +496,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -582,7 +582,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -672,7 +672,7 @@ public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends WANTes
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
index c922314..a0b00e9 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
@@ -312,11 +312,11 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer pnPort = (Integer)vm3.invoke(() -> createFirstRemoteLocator(4, lnPort));
createCacheInVMs(nyPort, vm4);
- vm4.invoke(() -> createReceiver(nyPort));
+ vm4.invoke(() -> createReceiver());
createCacheInVMs(tkPort, vm5);
- vm5.invoke(() -> createReceiver(tkPort));
+ vm5.invoke(() -> createReceiver());
createCacheInVMs(pnPort, vm6);
- vm6.invoke(() -> createReceiver(pnPort));
+ vm6.invoke(() -> createReceiver());
try {
vm7.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(lnPort));
@@ -366,7 +366,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer nyPort = locatorPorts[1];
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> createReceiver(nyPort));
+ vm2.invoke(() -> createReceiver());
try {
createAndStartSender(vm4, lnPort, 5, true, false);
@@ -407,9 +407,9 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer tkPort = (Integer)vm2.invoke(() -> createFirstRemoteLocator(3, lnPort));
createCacheInVMs(nyPort, vm6);
- vm6.invoke(() -> createReceiver(nyPort));
+ vm6.invoke(() -> createReceiver());
createCacheInVMs(tkPort, vm7);
- vm7.invoke(() -> createReceiver(tkPort));
+ vm7.invoke(() -> createReceiver());
try {
createAndStartTwoSenders(vm4, lnPort, 4);
@@ -454,7 +454,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer nyPort = locatorPorts[1];
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> createReceiver(nyPort));
+ vm2.invoke(() -> createReceiver());
try {
createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 5, true);
@@ -462,7 +462,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
LogWriterUtils.getLogWriter().info("Created PRs on local site");
- vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, null, 1, 100, isOffHeap()));
+ vm2.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap()));
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion( 10 ));
Wait.pause(1000);
@@ -492,7 +492,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
Integer nyPort = locatorPorts[1];
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
try {
createAndStartSenderWithCustomerOrderShipmentRegion(vm4, lnPort, 6, true);
@@ -500,8 +500,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
LogWriterUtils.getLogWriter().info("Created PRs on local site");
- vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion( null,
- null, 1, 100, isOffHeap() ));
+ vm2.invoke(() -> WANTestBase.createCustomerOrderShipmentPartitionedRegion(null, 1, 100, isOffHeap() ));
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.putcolocatedPartitionedRegion( 2000 ));
Wait.pause(1000);
@@ -570,7 +569,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
// Note: This is a test-specific method used by several tests to do puts from vm4 to vm2.
String regionName = getTestMethodName() + "_PR";
createCacheInVMs(port, vm2);
- vm2.invoke(() -> createReceiver(port));
+ vm2.invoke(() -> createReceiver());
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
@@ -589,7 +588,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
protected void createAndStartSenderWithCustomerOrderShipmentRegion(VM vm, int port, int concurrencyLevel, boolean manualStart) {
vm.invoke(() -> createCache_INFINITE_MAXIMUM_SHUTDOWN_WAIT_TIME(port));
- vm.invoke(() -> createCustomerOrderShipmentPartitionedRegion(null, "ln", 1, 100, isOffHeap()));
+ vm.invoke(() -> createCustomerOrderShipmentPartitionedRegion("ln", 1, 100, isOffHeap()));
createSender(vm, concurrencyLevel, manualStart);
vm.invoke(() -> startSender("ln"));
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java
index d29a995..315e9ca 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java
@@ -84,8 +84,8 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase {
vm3.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver(nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver(nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.validateRegionSize(
getTestMethodName() + "_RR", 1000 ));
@@ -100,7 +100,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -143,7 +143,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -230,7 +230,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase {
//these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -310,7 +310,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase {
//these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//these are part of local site
@@ -379,7 +379,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase {
//these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -457,7 +457,7 @@ public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
// these are part of remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
// these are part of local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java
index a9b4b9d..6026b3a 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java
@@ -16,13 +16,8 @@
*/
package com.gemstone.gemfire.internal.cache.wan.concurrent;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.EntryExistsException;
-import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
-import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.IgnoredException;
import com.gemstone.gemfire.test.dunit.Wait;
@@ -49,7 +44,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -79,9 +74,9 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
getTestMethodName() + "_RR", 150 ));
vm2.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 150 ));
+ getTestMethodName() + "_RR", 150, 240000));
vm3.invoke(() -> WANTestBase.validateRegionSize(
- getTestMethodName() + "_RR", 150 ));
+ getTestMethodName() + "_RR", 150, 240000 ));
}
public void Bug46921_testSerialReplicatedWanWithPersistence() {
@@ -90,7 +85,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -133,8 +128,8 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
createCacheInVMs(tkPort, vm3);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -184,7 +179,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -228,7 +223,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -269,7 +264,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -309,7 +304,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2,lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -352,7 +347,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
getTestMethodName(), null, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName(), null, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -389,7 +384,7 @@ public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.createCache(nyPort));
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
WANTestBase.createCacheInVMs(lnPort, vm4, vm5);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java
index 3de19e2..63e7266 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java
@@ -16,11 +16,7 @@
*/
package com.gemstone.gemfire.internal.cache.wan.disttx;
-import com.gemstone.gemfire.CancelException;
-import com.gemstone.gemfire.cache.CacheClosedException;
-import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.Invoke;
import com.gemstone.gemfire.test.dunit.LogWriterUtils;
import com.gemstone.gemfire.test.dunit.SerializableCallable;
@@ -56,7 +52,7 @@ public class DistTXWANDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -95,7 +91,7 @@ public class DistTXWANDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -133,7 +129,7 @@ public class DistTXWANDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
index cba42df..5731b45 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
@@ -22,9 +22,7 @@ import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
-import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import com.gemstone.gemfire.test.dunit.Assert;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.IgnoredException;
@@ -81,7 +79,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -161,7 +159,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -249,7 +247,7 @@ public class CommonParallelGatewaySenderDUnitTest extends WANTestBase {
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
index 680f6bc..2c62a7c 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
@@ -61,17 +61,17 @@ public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase {
// Site 1
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
createCacheInVMs(lnPort, vm1);
- Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver( lnPort ));
+ Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
//Site 2
Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm3);
- Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
//Site 3
Integer tkPort = (Integer)vm4.invoke(() -> WANTestBase.createFirstRemoteLocator( 3, lnPort ));
createCacheInVMs(tkPort, vm5);
- Integer tkRecPort = (Integer) vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ Integer tkRecPort = (Integer) vm5.invoke(() -> WANTestBase.createReceiver());
LogWriterUtils.getLogWriter().info("Created locators and receivers in 3 distributed systems");
@@ -145,12 +145,12 @@ public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase {
// Site 1
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
vm1.invoke(() -> WANTestBase.createCache(lnPort));
- Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver( lnPort ));
+ Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
//Site 2
Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
vm3.invoke(() -> WANTestBase.createCache(nyPort));
- Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
@@ -250,12 +250,12 @@ public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() {
// Site 1
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
createCacheInVMs(lnPort, vm1);
- Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver( lnPort ));
+ Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
//Site 2
Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm3);
- Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
@@ -357,12 +357,12 @@ public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() {
// Site 1
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
createCacheInVMs(lnPort, vm1);
- Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver( lnPort ));
+ Integer lnRecPort = (Integer) vm1.invoke(() -> WANTestBase.createReceiver());
//Site 2
Integer nyPort = (Integer)vm2.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm3);
- Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ Integer nyRecPort = (Integer) vm3.invoke(() -> WANTestBase.createReceiver());
LogWriterUtils.getLogWriter().info("Created locators and receivers in 2 distributed systems");
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java
index 1782309..d665f60 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java
@@ -16,10 +16,7 @@
*/
package com.gemstone.gemfire.internal.cache.wan.misc;
-import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.IgnoredException;
import com.gemstone.gemfire.test.dunit.Wait;
@@ -45,7 +42,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3);
@@ -224,9 +221,9 @@ public class PDXNewWanDUnitTest extends WANTestBase{
createCacheInVMs(lnPort, vm3);
createCacheInVMs(nyPort, vm4);
createCacheInVMs(tkPort, vm5);
- vm3.invoke(() -> WANTestBase.createReceiver( lnPort ));
- vm4.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm5.invoke(() -> WANTestBase.createReceiver( tkPort ));
+ vm3.invoke(() -> WANTestBase.createReceiver());
+ vm4.invoke(() -> WANTestBase.createReceiver());
+ vm5.invoke(() -> WANTestBase.createReceiver());
//Create all of our gateway senders
@@ -294,7 +291,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3);
@@ -337,7 +334,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 0, 2, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3);
@@ -404,7 +401,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null,1, 5, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3, vm4);
@@ -473,7 +470,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm3.invoke(() -> WANTestBase.createCache( lnPort ));
@@ -502,7 +499,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 0, 1, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3);
@@ -556,7 +553,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3, vm4);
@@ -622,7 +619,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3);
@@ -652,7 +649,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3, vm4);
@@ -691,7 +688,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm3);
@@ -714,7 +711,7 @@ public class PDXNewWanDUnitTest extends WANTestBase{
vm2.invoke(() -> WANTestBase.killSender());
- createReceiverInVMs(nyPort, vm2, vm4);
+ createReceiverInVMs(vm2, vm4);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 2, isOffHeap() ));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java
index b0a14af..4469570 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPersistenceDUnitTest.java
@@ -16,7 +16,6 @@
*/
package com.gemstone.gemfire.internal.cache.wan.misc;
-import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
@@ -52,7 +51,7 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
@@ -194,7 +193,7 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas
getTestMethodName() + "_RR", null, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -342,8 +341,8 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas
//create receiver on remote site
vm2.invoke(() -> WANTestBase.createCache( nyPort ));
vm3.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
@@ -554,7 +553,7 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -597,14 +596,14 @@ public class ReplicatedRegion_ParallelWANPersistenceDUnitTest extends WANTestBas
vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- pauseWaitCriteria(60000);
+ Thread.sleep(60000);
{
AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropogationDUnitTest.doPuts0( getTestMethodName() + "_RR", 10000 ));
- pauseWaitCriteria(1000);
+ Thread.sleep(1000);
AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
- pauseWaitCriteria(2000);
+ Thread.sleep(2000);
AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropogationDUnitTest.doPuts1( getTestMethodName() + "_RR", 10000 ));
- pauseWaitCriteria(1500);
+ Thread.sleep(1500);
AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
try {
inv1.join();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java
index 3387249..543d622 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java
@@ -16,33 +16,14 @@
*/
package com.gemstone.gemfire.internal.cache.wan.misc;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.CacheClosedException;
import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.client.ServerConnectivityException;
-import com.gemstone.gemfire.cache.client.ServerOperationException;
-import com.gemstone.gemfire.cache.wan.GatewaySender;
-import com.gemstone.gemfire.distributed.internal.ReplyException;
-import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
-import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
-import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
import com.gemstone.gemfire.test.dunit.Assert;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase;
import com.gemstone.gemfire.test.dunit.IgnoredException;
import com.gemstone.gemfire.test.dunit.LogWriterUtils;
import com.gemstone.gemfire.test.dunit.Wait;
@@ -68,7 +49,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
@@ -343,7 +324,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
@@ -389,7 +370,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5);
@@ -441,7 +422,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
vm2.invoke(() -> WANTestBase.createCache( nyPort ));
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.createCache( lnPort ));
vm5.invoke(() -> WANTestBase.createCache( lnPort ));
@@ -492,8 +473,8 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
- vm3.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
+ vm3.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
@@ -590,7 +571,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
@@ -665,7 +646,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -788,7 +769,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createReplicatedRegion(
getTestMethodName() + "_RR", null, isOffHeap() ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5);
@@ -838,7 +819,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5);
@@ -862,7 +843,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
vm4.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
vm5.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- pauseWaitCriteria(60000);
+ Thread.sleep(60000);;
/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
.getName());
@@ -909,7 +890,7 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -943,18 +924,18 @@ public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBas
vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- pauseWaitCriteria(60000);
+ Thread.sleep(60000);
/* ExpectedException exp1 = addExpectedException(CacheClosedException.class
.getName());
try */{
AsyncInvocation inv1 = vm7.invokeAsync(() -> ReplicatedRegion_ParallelWANPropogationDUnitTest.doPuts0(
getTestMethodName() + "_RR", 10000 ));
- pauseWaitCriteria(1000);
+ Thread.sleep(1000);
AsyncInvocation inv2 = vm4.invokeAsync(() -> WANTestBase.killSender());
- pauseWaitCriteria(2000);
+ Thread.sleep(2000);
AsyncInvocation inv3 = vm6.invokeAsync(() -> ReplicatedRegion_ParallelWANPropogationDUnitTest.doPuts1(
getTestMethodName() + "_RR", 10000 ));
- pauseWaitCriteria(1500);
+ Thread.sleep(1500);
AsyncInvocation inv4 = vm5.invokeAsync(() -> WANTestBase.killSender());
try {
inv1.join();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
index 92e318c..4ee378f 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
@@ -61,7 +61,7 @@ public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase {
vm2.invoke(() -> WANTestBase.createCache( nyPort ));
vm3.invoke(() -> WANTestBase.createCache( nyPort ));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion( getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap() ));
@@ -124,7 +124,7 @@ public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase {
"vm1's region size before restart gatewayhub is " + region.size());
}
});
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
// wait for vm0 to finish its work
vm4_future.join(MAX_WAIT);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANSSLDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANSSLDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANSSLDUnitTest.java
index 5f81d1f..f322394 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANSSLDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WANSSLDUnitTest.java
@@ -98,7 +98,7 @@ public class WANSSLDUnitTest extends WANTestBase{
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.createCacheWithSSL( lnPort ));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanValidationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanValidationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanValidationsDUnitTest.java
index cb013ef..612a178 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanValidationsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/WanValidationsDUnitTest.java
@@ -19,7 +19,6 @@ package com.gemstone.gemfire.internal.cache.wan.misc;
import java.util.ArrayList;
import java.util.Map;
-import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
@@ -724,7 +723,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
createCacheInVMs(lnPort, vm4);
- vm4.invoke(() -> WANTestBase.createReceiver( lnPort ));
+ vm4.invoke(() -> WANTestBase.createReceiver());
vm4.invoke(() -> WANTestBase.createCacheServer( ));
@@ -743,7 +742,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
public void test_GetCacheServersDoesNotReturnReceivers_Scenario2() {
Integer lnPort = (Integer)vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId( 1 ));
createCacheInVMs(lnPort, vm4);
- vm4.invoke(() -> WANTestBase.createReceiver( lnPort ));
+ vm4.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm5);
vm5.invoke(() -> WANTestBase.createCacheServer( ));
@@ -822,7 +821,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(createReceiverReplicatedRegion());
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -904,7 +903,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(createReceiverReplicatedRegion());
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -981,7 +980,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_RR", null, 1, 100, isOffHeap() ));
@@ -1074,7 +1073,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 100, isOffHeap() ));
@@ -1169,7 +1168,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 10, isOffHeap()));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1256,7 +1255,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 1, 10, isOffHeap()));
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1428,7 +1427,7 @@ public class WanValidationsDUnitTest extends WANTestBase {
// ------------- START - CREATE CACHE ON REMOTE SITE ---------------//
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
vm2.invoke(() -> WANTestBase.createSender( "ny", 1,
false, 100, 10, false, false, null, true ));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 3f0329a..0d34f44 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -16,19 +16,13 @@
*/
package com.gemstone.gemfire.internal.cache.wan.parallel;
-import junit.framework.Test;
-import junit.framework.TestSuite;
-
import com.gemstone.gemfire.GemFireIOException;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderException;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
-import com.gemstone.gemfire.test.dunit.DistributedTestCase;
import com.gemstone.gemfire.test.dunit.IgnoredException;
import com.gemstone.gemfire.test.dunit.LogWriterUtils;
import com.gemstone.gemfire.test.dunit.RMIException;
@@ -176,7 +170,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
Integer nyPort = locatorPorts[1];
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5);
@@ -575,7 +569,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
IgnoredException ignoredGIOE = IgnoredException.addIgnoredException(GemFireIOException.class.getName(), vm4);
vm2.invoke(() -> createCache( nyPort ));
vm2.invoke(() -> createPartitionedRegion( regionName, null, 0, 100, isOffHeap() ));
- vm2.invoke(() -> createReceiver( nyPort ));
+ vm2.invoke(() -> createReceiver());
validateRegionSizes( regionName, numPuts, vm2 );
vm4.invoke(() -> {
@@ -608,7 +602,7 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
// Note: This is a test-specific method used by several test to create
// receivers and senders.
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java
index 07c2610..7169b2e 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelGatewaySenderQueueOverflowDUnitTest.java
@@ -61,18 +61,14 @@ public class ParallelGatewaySenderQueueOverflowDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
- vm4.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2,
- true, 10, 10, false, false, null, true ));
- vm5.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2,
- true, 10, 10, false, false, null, true ));
- vm6.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2,
- true, 10, 10, false, false, null, true ));
- vm7.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2,
- true, 10, 10, false, false, null, true ));
+ vm4.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true ));
+ vm5.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true ));
+ vm6.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true ));
+ vm7.invoke(() -> WANTestBase.createSenderWithoutDiskStore( "ln", 2, 10, 10, false, true ));
vm4.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName(), "ln", 1, 100, isOffHeap() ));
@@ -127,8 +123,8 @@ public class ParallelGatewaySenderQueueOverflowDUnitTest extends WANTestBase {
vm6.invoke(() -> WANTestBase.resumeSender( "ln" ));
vm7.invoke(() -> WANTestBase.resumeSender( "ln" ));
- vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 ));
- vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50 ));
+ vm2.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50, 240000 ));
+ vm3.invoke(() -> WANTestBase.validateRegionSize( getTestMethodName(), 50, 240000 ));
}
/**
@@ -141,7 +137,7 @@ public class ParallelGatewaySenderQueueOverflowDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -222,7 +218,7 @@ public class ParallelGatewaySenderQueueOverflowDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -303,7 +299,7 @@ public class ParallelGatewaySenderQueueOverflowDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
index ab261fd..cab59a4 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANConflationDUnitTest.java
@@ -359,21 +359,15 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
}
protected void createOrderShipmentOnReceivers() {
- vm2.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
- getTestMethodName(), null, 1, 8, isOffHeap() ));
- vm3.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
- getTestMethodName(), null, 1, 8, isOffHeap() ));
+ vm2.invoke(() ->createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap() ));
+ vm3.invoke(() ->createCustomerOrderShipmentPartitionedRegion(null, 1, 8, isOffHeap() ));
}
protected void createOrderShipmentOnSenders() {
- vm4.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
- getTestMethodName(), "ln", 0, 8, isOffHeap() ));
- vm5.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
- getTestMethodName(), "ln", 0, 8, isOffHeap() ));
- vm6.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
- getTestMethodName(), "ln", 0, 8, isOffHeap() ));
- vm7.invoke(() ->createCustomerOrderShipmentPartitionedRegion(
- getTestMethodName(), "ln", 0, 8, isOffHeap() ));
+ vm4.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() ));
+ vm5.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() ));
+ vm6.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() ));
+ vm7.invoke(() ->createCustomerOrderShipmentPartitionedRegion("ln", 0, 8, isOffHeap() ));
}
protected Map updateKeyValues() {
@@ -456,7 +450,7 @@ public class ParallelWANConflationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() ->createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
index 8139dca..ae38110 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
@@ -25,11 +25,9 @@ import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
import com.gemstone.gemfire.internal.cache.ColocationHelper;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.IgnoredException;
import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableCallableIF;
import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
@@ -87,7 +85,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -133,7 +131,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
LogWriterUtils.getLogWriter().info("Created remote receivers");
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -198,7 +196,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -329,7 +327,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -560,7 +558,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
getTestMethodName(), null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPersistentPartitionedRegion(
getTestMethodName(), null, 1, 100, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
vm4.invoke(pauseSenderRunnable());
vm5.invoke(pauseSenderRunnable());
@@ -600,7 +598,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -735,7 +733,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -863,7 +861,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1073,7 +1071,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1181,7 +1179,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1293,7 +1291,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1431,7 +1429,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
//create receiver on remote site
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//create cache in local site
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -1484,7 +1482,7 @@ public class ParallelWANPersistenceEnabledGatewaySenderDUnitTest extends
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
index 2c0d693..0e43930 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
@@ -42,7 +42,7 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5);
@@ -106,7 +106,7 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5);
@@ -151,7 +151,7 @@ public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- vm2.invoke(() -> WANTestBase.createReceiver( nyPort ));
+ vm2.invoke(() -> WANTestBase.createReceiver());
vm2.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName() + "_PR", null, 3, 4, isOffHeap() ));
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
index 6d4b03a..63a129b 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationDUnitTest.java
@@ -16,8 +16,6 @@
*/
package com.gemstone.gemfire.internal.cache.wan.parallel;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import com.gemstone.gemfire.cache.EntryExistsException;
@@ -29,11 +27,9 @@ import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.BatchException70;
import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
-import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.IgnoredException;
import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableCallableIF;
import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
import com.gemstone.gemfire.test.dunit.Wait;
@@ -114,7 +110,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2, vm3);
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
//verify all buckets drained on all sender nodes.
vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
@@ -139,7 +135,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -200,7 +196,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -249,7 +245,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -302,7 +298,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -393,7 +389,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -440,7 +436,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -490,7 +486,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
getTestMethodName(), null, 1, 100, isOffHeap() ));
vm3.invoke(() -> WANTestBase.createPartitionedRegion(
getTestMethodName(), null, 1, 100, isOffHeap() ));
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -538,7 +534,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "lnSerial",
@@ -610,11 +606,11 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
vm2.invoke(createReceiverPartitionedRegionRedundancy1());
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createCacheInVMs(tkPort, vm3);
vm3.invoke(createReceiverPartitionedRegionRedundancy1());
- createReceiverInVMs(tkPort, vm3);
+ createReceiverInVMs(vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
@@ -692,7 +688,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -750,7 +746,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -802,7 +798,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -857,7 +853,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -931,7 +927,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2, vm3);
- createReceiverInVMs(nyPort, vm2, vm3);
+ createReceiverInVMs(vm2, vm3);
createCacheInVMs(lnPort, vm4, vm5);
//vm6.invoke(() -> WANTestBase.createCache( lnPort ));
//vm7.invoke(() -> WANTestBase.createCache( lnPort ));
@@ -965,8 +961,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
vm5.invoke(waitForSenderRunnable());
// vm6.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
// vm7.invoke(() -> WANTestBase.waitForSenderRunningState( "ln" ));
- vm4.invoke(() -> WANTestBase.doTxPuts( getTestMethodName() + "_PR",
- 1000 ));
+ vm4.invoke(() -> WANTestBase.doTxPuts( getTestMethodName() + "_PR"));
//verify all buckets drained on all sender nodes.
vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln"));
@@ -984,7 +979,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
createCacheInVMs(nyPort, vm2);
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -1035,7 +1030,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
createCacheInVMs(lnPort, vm4, vm5);
vm4.invoke(() -> WANTestBase.createSender( "ln", 2,
@@ -1090,7 +1085,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
//create cache and receiver on site2
createCacheInVMs(nyPort, vm2);
- createReceiverInVMs(nyPort, vm2);
+ createReceiverInVMs(vm2);
//create cache on site1
createCacheInVMs(lnPort, vm3);
@@ -1130,7 +1125,7 @@ public class ParallelWANPropagationDUnitTest extends WANTestBase {
Integer nyPort = (Integer)vm1.invoke(() -> WANTestBase.createFirstRemoteLocator( 2, lnPort ));
createCacheInVMs(nyPort, vm6, vm7);
- createReceiverInVMs(nyPort, vm6, vm7);
+ createReceiverInVMs(vm6, vm7);
createCacheInVMs(lnPort, vm2, vm3, vm4, vm5);
[5/5] incubator-geode git commit: GEODE-1032: Fixed compilation error
with ConcurrentParallelGatewaySenderOperation_2_DUnitTest
Posted by ja...@apache.org.
GEODE-1032: Fixed compilation error with ConcurrentParallelGatewaySenderOperation_2_DUnitTest
The pull request for GEODE-1125, that was accepted, modified the method signature for createReceiver
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/bcaf0c69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/bcaf0c69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/bcaf0c69
Branch: refs/heads/develop
Commit: bcaf0c699e623a925322c05f60a0195546003376
Parents: 84a4b47
Author: Jason Huynh <hu...@gmail.com>
Authored: Mon Apr 18 10:37:39 2016 -0700
Committer: Jason Huynh <hu...@gmail.com>
Committed: Mon Apr 18 10:39:47 2016 -0700
----------------------------------------------------------------------
...ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/bcaf0c69/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
index a0b00e9..9e5c0fd 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_2_DUnitTest.java
@@ -70,7 +70,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
- vm2.invoke(() -> createReceiver(nyPort));
+ vm2.invoke(() -> createReceiver());
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
@@ -109,7 +109,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
- vm2.invoke(() -> createReceiver(nyPort));
+ vm2.invoke(() -> createReceiver());
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
@@ -149,7 +149,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
- vm2.invoke(() -> createReceiver(nyPort));
+ vm2.invoke(() -> createReceiver());
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
@@ -274,7 +274,7 @@ public class ConcurrentParallelGatewaySenderOperation_2_DUnitTest extends WANTes
createCacheInVMs(nyPort, vm2);
vm2.invoke(() -> createPartitionedRegion(regionName, null, 1, 10, isOffHeap()));
- vm2.invoke(() -> createReceiver(nyPort));
+ vm2.invoke(() -> createReceiver());
vm4.invoke(() -> doPuts(regionName, 10));
vm4.invoke(() -> validateRegionSize(regionName, 10));
[3/5] incubator-geode git commit: GEODE-1032 : Additional wait time
to check for empty queue,
refactored WANTestBase.java to remove unused functions,
replaced wait criterions with awaitility.
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index e93c9c2..aca2cb9 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -30,7 +30,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
@@ -60,12 +59,10 @@ import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
@@ -154,8 +151,6 @@ public class WANTestBase extends DistributedTestCase{
protected static QueueListener listener1;
protected static QueueListener listener2;
- protected static List<QueueListener> gatewayListeners;
-
protected static AsyncEventListener eventListener1 ;
protected static AsyncEventListener eventListener2 ;
@@ -163,8 +158,6 @@ public class WANTestBase extends DistributedTestCase{
protected static GatewayEventFilter eventFilter;
- protected static boolean destroyFlag = false;
-
protected static List<Integer> dispatcherThreads =
new ArrayList<Integer>(Arrays.asList(1, 3, 5));
//this will be set for each test method run with one of the values from above list
@@ -315,7 +308,6 @@ public class WANTestBase extends DistributedTestCase{
props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost");
props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
test.getSystem(props);
- return;
}
@@ -376,8 +368,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -400,8 +390,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -411,32 +399,12 @@ public class WANTestBase extends DistributedTestCase{
assertNotNull(r);
}
-// public static void createReplicatedRegion_PDX(String regionName, String senderId, DataPolicy policy, InterestPolicy intPolicy){
-// AttributesFactory fact = new AttributesFactory();
-// if(senderId!= null){
-// StringTokenizer tokenizer = new StringTokenizer(senderId, ",");
-// while (tokenizer.hasMoreTokens()){
-// String sender = tokenizer.nextToken();
-// //fact.addSerialGatewaySenderId(sender);
-// }
-// }
-// fact.setDataPolicy(policy);
-// SubscriptionAttributes subAttr = new SubscriptionAttributes(intPolicy);
-// fact.setSubscriptionAttributes(subAttr);
-// fact.setScope(Scope.DISTRIBUTED_ACK);
-// Region r = cache.createRegionFactory(fact.create()).create(regionName);
-// assertNotNull(r);
-// assertTrue(r.size() == 0);
-// }
-
public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
AttributesFactory fact = new AttributesFactory();
if(senderIds!= null){
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -446,27 +414,6 @@ public class WANTestBase extends DistributedTestCase{
assertNotNull(r);
}
-// public static void createReplicatedRegionWithParallelSenderId(String regionName, String senderId){
-// AttributesFactory fact = new AttributesFactory();
-// if(senderId!= null){
-// StringTokenizer tokenizer = new StringTokenizer(senderId, ",");
-// while (tokenizer.hasMoreTokens()){
-// String sender = tokenizer.nextToken();
-// //fact.addParallelGatewaySenderId(sender);
-// }
-// }
-// fact.setDataPolicy(DataPolicy.REPLICATE);
-// Region r = cache.createRegionFactory(fact.create()).create(regionName);
-// assertNotNull(r);
-// }
-
-// public static void createReplicatedRegion(String regionName){
-// AttributesFactory fact = new AttributesFactory();
-// fact.setDataPolicy(DataPolicy.REPLICATE);
-// Region r = cache.createRegionFactory(fact.create()).create(regionName);
-// assertNotNull(r);
-// }
-
public static void createReplicatedRegionWithAsyncEventQueue(
String regionName, String asyncQueueIds, Boolean offHeap) {
IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -491,31 +438,11 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void createPersistentReplicatedRegionWithAsyncEventQueue(
- String regionName, String asyncQueueIds) {
-
- AttributesFactory fact = new AttributesFactory();
- if(asyncQueueIds != null){
- StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
- while (tokenizer.hasMoreTokens()){
- String asyncQueueId = tokenizer.nextToken();
- fact.addAsyncEventQueueId(asyncQueueId);
- }
- }
- fact.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- RegionFactory regionFactory = cache.createRegionFactory(fact.create());
- Region r = regionFactory.create(regionName);
- assertNotNull(r);
- }
-
-
-
public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
.getName());
try {
-
AttributesFactory fact = new AttributesFactory();
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
@@ -543,8 +470,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -567,7 +492,7 @@ public class WANTestBase extends DistributedTestCase{
File[] dirs1 = new File[] { directory };
DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
+ dsf.create(diskStoreName);
}
AsyncEventListener asyncEventListener = new MyAsyncEventListener();
@@ -582,305 +507,7 @@ public class WANTestBase extends DistributedTestCase{
factory.setParallel(isParallel);
//set dispatcher threads
factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
- }
-
- public static void createAsyncEventQueueWithListener2(String asyncChannelId,
- boolean isParallel, Integer maxMemory, Integer batchSize,
- boolean isPersistent, String diskStoreName) {
-
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
-
- AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- //set dispatcher threads
- factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
- asyncEventListener);
- }
-
- public static void createAsyncEventQueue(
- String asyncChannelId, boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
-
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
-
- String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
- String className = packagePrefix + asyncListenerClass;
- AsyncEventListener asyncEventListener = null;
- try {
- Class clazz = Class.forName(className);
- asyncEventListener = (AsyncEventListener) clazz.newInstance();
- } catch (ClassNotFoundException e) {
- throw e;
- } catch (InstantiationException e) {
- throw e;
- } catch (IllegalAccessException e) {
- throw e;
- }
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setDiskSynchronous(isDiskSynchronous);
- factory.setBatchConflationEnabled(isConflation);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- //set dispatcher threads
- factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
- }
-
- public static void createAsyncEventQueueWithCustomListener(
- String asyncChannelId, boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- String diskStoreName, boolean isDiskSynchronous) {
- createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, maxMemory, batchSize,
- isConflation, isPersistent, diskStoreName, isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS);
- }
-
- public static void createAsyncEventQueueWithCustomListener(
- String asyncChannelId, boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- String diskStoreName, boolean isDiskSynchronous, int nDispatchers) {
-
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
-
- try {
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
-
- AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- factory.setDispatcherThreads(nDispatchers);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
- asyncEventListener);
- } finally {
- exp.remove();
- }
- }
-
- public static void createConcurrentAsyncEventQueue(
- String asyncChannelId, boolean isParallel,
- Integer maxMemory, Integer batchSize, boolean isConflation,
- boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
- int dispatcherThreads, OrderPolicy policy) {
-
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
-
- AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setDiskSynchronous(isDiskSynchronous);
- factory.setBatchConflationEnabled(isConflation);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- factory.setDispatcherThreads(dispatcherThreads);
- factory.setOrderPolicy(policy);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
- }
-
-
- public static String createAsyncEventQueueWithDiskStore(
- String asyncChannelId, boolean isParallel,
- Integer maxMemory, Integer batchSize,
- boolean isPersistent, String diskStoreName) {
-
- AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-
- File persistentDirectory = null;
- if (diskStoreName == null) {
- persistentDirectory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- } else {
- persistentDirectory = new File(diskStoreName);
- }
- LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName());
- persistentDirectory.mkdir();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- File [] dirs1 = new File[] {persistentDirectory};
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setParallel(isParallel);
- if (isPersistent) {
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId).getName());
- }
- factory.setMaximumQueueMemory(maxMemory);
- //set dispatcher threads
- factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
- return persistentDirectory.getName();
- }
-
- public static void pauseAsyncEventQueue(String asyncChannelId) {
- AsyncEventQueue theChannel = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncChannelId.equals(asyncChannel.getId())) {
- theChannel = asyncChannel;
- }
- }
-
- ((AsyncEventQueueImpl)theChannel).getSender().pause();
- }
-
- public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(String asyncChannelId) {
- AsyncEventQueue theChannel = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncChannelId.equals(asyncChannel.getId())) {
- theChannel = asyncChannel;
- break;
- }
- }
-
- ((AsyncEventQueueImpl)theChannel).getSender().pause();
-
-
- ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender()).getEventProcessor().waitForDispatcherToPause();
- }
-
- public static void resumeAsyncEventQueue(String asyncQueueId) {
- AsyncEventQueue theQueue = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theQueue = asyncChannel;
- }
- }
-
- ((AsyncEventQueueImpl)theQueue).getSender().resume();
- }
-
-
- public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
- AsyncEventQueue theAsyncEventQueue = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theAsyncEventQueue = asyncChannel;
- }
- }
-
- GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue).getSender();
-
- if (sender.isParallel()) {
- Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
- assertEquals(numQueueEntries,
- queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size());
- } else {
- Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- assertEquals(numQueueEntries, size);
- }
- }
-
- /**
- * This method verifies the queue size of a ParallelGatewaySender. For
- * ParallelGatewaySender conflation happens in a separate thread, hence test
- * code needs to wait for some time for expected result
- *
- * @param asyncQueueId
- * Async Queue ID
- * @param numQueueEntries
- * expected number of Queue entries
- * @throws Exception
- */
- public static void waitForAsyncEventQueueSize(String asyncQueueId,
- final int numQueueEntries) throws Exception {
- AsyncEventQueue theAsyncEventQueue = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theAsyncEventQueue = asyncChannel;
- }
- }
-
- GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue)
- .getSender();
-
- if (sender.isParallel()) {
- final Set<RegionQueue> queues = ((AbstractGatewaySender) sender)
- .getQueues();
-
- Wait.waitForCriterion(new WaitCriterion() {
-
- public String description() {
- return "Waiting for EventQueue size to be " + numQueueEntries;
- }
-
- public boolean done() {
- boolean done = numQueueEntries == queues
- .toArray(new RegionQueue[queues.size()])[0].getRegion().size();
- return done;
- }
-
- }, MAX_WAIT, 500, true);
-
- } else {
- throw new Exception(
- "This method should be used for only ParallelGatewaySender,SerialGatewaySender should use checkAsyncEventQueueSize() method instead");
-
- }
+ factory.create(asyncChannelId, asyncEventListener);
}
public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
@@ -894,8 +521,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -927,8 +552,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -958,8 +581,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -994,111 +615,6 @@ public class WANTestBase extends DistributedTestCase{
mutator.addAsyncEventQueueId(queueId);
}
- public static void createPartitionedRegionWithAsyncEventQueue(
- String regionName, String asyncEventQueueId, Boolean offHeap) {
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
- .getName());
- try {
- AttributesFactory fact = new AttributesFactory();
-
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- pfact.setTotalNumBuckets(16);
- fact.setPartitionAttributes(pfact.create());
- fact.setOffHeap(offHeap);
- Region r = cache.createRegionFactory(fact.create())
- .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
- assertNotNull(r);
- }
- finally {
- exp.remove();
- exp1.remove();
- }
- }
-
- public static void createColocatedPartitionedRegionWithAsyncEventQueue(
- String regionName, String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) {
-
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
- .getName());
- try {
- AttributesFactory fact = new AttributesFactory();
-
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- pfact.setTotalNumBuckets(totalNumBuckets);
- pfact.setColocatedWith(colocatedWith);
- fact.setPartitionAttributes(pfact.create());
- Region r = cache.createRegionFactory(fact.create())
- .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
- assertNotNull(r);
- }
- finally {
- exp.remove();
- exp1.remove();
- }
- }
-
- public static void createPersistentPartitionedRegionWithAsyncEventQueue(
- String regionName, String asyncEventQueueId) {
- AttributesFactory fact = new AttributesFactory();
-
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
- pfact.setTotalNumBuckets(16);
- fact.setPartitionAttributes(pfact.create());
- if (asyncEventQueueId != null) {
- StringTokenizer tokenizer = new StringTokenizer(asyncEventQueueId, ",");
- while (tokenizer.hasMoreTokens()) {
- String asyncId = tokenizer.nextToken();
- fact.addAsyncEventQueueId(asyncId);
- }
- }
- Region r = cache.createRegionFactory(fact.create()).create(regionName);
- assertNotNull(r);
- }
-
- /**
- * Create PartitionedRegion with 1 redundant copy
- */
- public static void createPRWithRedundantCopyWithAsyncEventQueue(
- String regionName, String asyncEventQueueId, Boolean offHeap) {
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
-
- try {
- AttributesFactory fact = new AttributesFactory();
-
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- pfact.setTotalNumBuckets(16);
- pfact.setRedundantCopies(1);
- fact.setPartitionAttributes(pfact.create());
- fact.setOffHeap(offHeap);
- Region r = cache.createRegionFactory(fact.create())
- .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
- assertNotNull(r);
- }
- finally {
- exp.remove();
- }
- }
-
- public static void createPartitionedRegionAccessorWithAsyncEventQueue(
- String regionName, String asyncEventQueueId) {
- AttributesFactory fact = new AttributesFactory();
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- pfact.setTotalNumBuckets(16);
- pfact.setLocalMaxMemory(0);
- fact.setPartitionAttributes(pfact.create());
- Region r = cache.createRegionFactory(
- fact.create()).addAsyncEventQueueId(
- asyncEventQueueId).create(regionName);
- //fact.create()).create(regionName);
- assertNotNull(r);
- }
-
public static void createPartitionedRegionAsAccessor(
String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
AttributesFactory fact = new AttributesFactory();
@@ -1106,8 +622,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1126,8 +640,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1135,8 +647,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(parallelSenderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1166,8 +676,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1187,7 +695,7 @@ public class WANTestBase extends DistributedTestCase{
}
public static void createCustomerOrderShipmentPartitionedRegion(
- String regionName, String senderIds, Integer redundantCopies,
+ String senderIds, Integer redundantCopies,
Integer totalNumBuckets, Boolean offHeap) {
IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
.getName());
@@ -1197,15 +705,11 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
PartitionAttributesFactory paf = new PartitionAttributesFactory();
- // creating colocated Regions
- paf = new PartitionAttributesFactory();
paf.setRedundantCopies(redundantCopies)
.setTotalNumBuckets(totalNumBuckets)
.setPartitionResolver(
@@ -1230,8 +734,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1255,8 +757,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1280,8 +780,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1309,8 +807,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1340,21 +836,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void createCacheInVMsAsync(Integer locatorPort, VM... vms) {
- List<AsyncInvocation> tasks = new LinkedList<>();
- for (VM vm : vms) {
- tasks.add(vm.invokeAsync(() -> createCache(locatorPort)));
- }
- for (AsyncInvocation invocation : tasks) {
- try {
- invocation.join(60000);
- }
- catch (InterruptedException e) {
- fail("Failed starting up the cache");
- }
- }
- }
-
public static void addListenerToSleepAfterCreateEvent(int milliSeconds) {
cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
.addCacheListener(new CacheListenerAdapter<Object, Object>() {
@@ -1370,6 +851,31 @@ public class WANTestBase extends DistributedTestCase{
});
}
+ private static CacheListener myListener;
+ public static void longPauseAfterNumEvents(int numEvents, int milliSeconds) {
+ myListener = new CacheListenerAdapter<Object, Object>() {
+ @Override
+ public void afterCreate(final EntryEvent<Object, Object> event) {
+ try {
+ if (event.getRegion().size() >= numEvents){
+ Thread.sleep(milliSeconds);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
+ .addCacheListener(myListener);
+ }
+
+ public static void removeCacheListener() {
+ cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
+ .removeCacheListener(myListener);
+
+ }
+
public static void createCache(Integer locPort){
createCache(false, locPort);
@@ -1449,7 +955,7 @@ public class WANTestBase extends DistributedTestCase{
File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File [] dirs1 = new File[] {pdxDir};
- DiskStore store = dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
+ dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
}
public static void createCache(Integer locPort1, Integer locPort2){
@@ -1576,19 +1082,6 @@ public class WANTestBase extends DistributedTestCase{
sender.test_setBatchConflationEnabled(true);
}
- public static void startAsyncEventQueue(String senderId) {
- Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
- AsyncEventQueue q = null;
- for (AsyncEventQueue s : queues) {
- if (s.getId().equals(senderId)) {
- q = s;
- break;
- }
- }
- //merge42180: There is no start method on AsyncEventQueue. Cheetah has this method. Yet the code for AsyncEvnt Queue is not properly merged from cheetah to cedar
- //q.start();
- }
-
public static Map getSenderToReceiverConnectionInfo(String senderId){
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
@@ -1622,25 +1115,13 @@ public class WANTestBase extends DistributedTestCase{
break;
}
}
- final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+ final GatewaySenderStats statistics = sender.getStatistics();
if (expectedQueueSize != -1) {
final RegionQueue regionQueue;
regionQueue = sender.getQueues().toArray(
new RegionQueue[1])[0];
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (regionQueue.size() == expectedQueueSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected queue entries: " + expectedQueueSize
- + " but actual entries: " + regionQueue.size();
- }
- };
- Wait.waitForCriterion(wc, 120000, 500, true);
+ Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> assertEquals("Expected queue entries: " +
+ expectedQueueSize + " but actual entries: " + regionQueue.size(), expectedQueueSize,regionQueue.size()));
}
ArrayList<Integer> stats = new ArrayList<Integer>();
stats.add(statistics.getEventQueueSize());
@@ -1674,28 +1155,10 @@ public class WANTestBase extends DistributedTestCase{
assert(statistics.getEventsDistributed() >= eventsDistributed);
}
- public static void checkAsyncEventQueueStats(String queueId, final int queueSize,
- final int eventsReceived, final int eventsQueued,
- final int eventsDistributed) {
- Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : asyncQueues) {
- if (q.getId().equals(queueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
- assertEquals(queueSize, statistics.getEventQueueSize());
- assertEquals(eventsReceived, statistics.getEventsReceived());
- assertEquals(eventsQueued, statistics.getEventsQueued());
- assert(statistics.getEventsDistributed() >= eventsDistributed);
- }
-
public static void checkGatewayReceiverStats(int processBatches,
int eventsReceived, int creates) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
- GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
.getAcceptor().getStats();
@@ -1709,7 +1172,7 @@ public class WANTestBase extends DistributedTestCase{
public static void checkMinimumGatewayReceiverStats(int processBatches,
int eventsReceived) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
- GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
.getAcceptor().getStats();
@@ -1721,7 +1184,7 @@ public class WANTestBase extends DistributedTestCase{
public static void checkExcepitonStats(int exceptionsOccured) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
- GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
.getAcceptor().getStats();
@@ -1739,7 +1202,7 @@ public class WANTestBase extends DistributedTestCase{
public static void checkGatewayReceiverStatsHA(int processBatches,
int eventsReceived, int creates) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
- GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
.getAcceptor().getStats();
@@ -1776,21 +1239,6 @@ public class WANTestBase extends DistributedTestCase{
assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
}
- public static void checkAsyncEventQueueConflatedStats(
- String asyncEventQueueId, final int eventsConflated) {
- Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : queues) {
- if (q.getId().equals(asyncEventQueueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
- .getStatistics();
- assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
- }
-
public static void checkStats_Failover(String senderId,
final int eventsReceived) {
Set<GatewaySender> senders = cache.getGatewaySenders();
@@ -1810,25 +1258,6 @@ public class WANTestBase extends DistributedTestCase{
.getUnprocessedEventsRemovedByPrimary()));
}
- public static void checkAsyncEventQueueStats_Failover(String asyncEventQueueId,
- final int eventsReceived) {
- Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : asyncEventQueues) {
- if (q.getId().equals(asyncEventQueueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue)
- .getStatistics();
-
- assertEquals(eventsReceived, statistics.getEventsReceived());
- assertEquals(eventsReceived, (statistics.getEventsQueued()
- + statistics.getUnprocessedTokensAddedByPrimary() + statistics
- .getUnprocessedEventsRemovedByPrimary()));
- }
-
public static void checkBatchStats(String senderId, final int batches) {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
@@ -1844,22 +1273,6 @@ public class WANTestBase extends DistributedTestCase{
assertEquals(0, statistics.getBatchesRedistributed());
}
- public static void checkAsyncEventQueueBatchStats(String asyncQueueId,
- final int batches) {
- Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : queues) {
- if (q.getId().equals(asyncQueueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
- .getStatistics();
- assert (statistics.getBatchesDistributed() >= batches);
- assertEquals(0, statistics.getBatchesRedistributed());
- }
-
public static void checkBatchStats(String senderId,
final boolean batchesDistributed, final boolean bathcesRedistributed) {
Set<GatewaySender> senders = cache.getGatewaySenders();
@@ -1896,43 +1309,13 @@ public class WANTestBase extends DistributedTestCase{
.getUnprocessedTokensAddedByPrimary()));
}
- public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, int events) {
- Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : asyncQueues) {
- if (q.getId().equals(asyncQueueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
- assertEquals(events,
- (statistics.getUnprocessedEventsAddedBySecondary() + statistics
- .getUnprocessedTokensRemovedBySecondary()));
- assertEquals(events,
- (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
- .getUnprocessedTokensAddedByPrimary()));
- }
-
public static void waitForSenderRunningState(String senderId){
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
try {
Set<GatewaySender> senders = cache.getGatewaySenders();
final GatewaySender sender = getGatewaySenderById(senders, senderId);
-
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender != null && sender.isRunning()) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected sender isRunning state to be true but is false";
- }
- };
- Wait.waitForCriterion(wc, 300000, 500, true);
+ Awaitility.await().atMost(300,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender isRunning state to "
+ + "be true but is false", true, (sender != null && sender.isRunning())));
} finally {
exln.remove();
}
@@ -1941,19 +1324,8 @@ public class WANTestBase extends DistributedTestCase{
public static void waitForSenderToBecomePrimary(String senderId){
Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
final GatewaySender sender = getGatewaySenderById(senders, senderId);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender != null && ((AbstractGatewaySender) sender).isPrimary()) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected sender primary state to be true but is false";
- }
- };
- Wait.waitForCriterion(wc, 10000, 1000, true);
+ Awaitility.await().atMost(10,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender primary state to "
+ + "be true but is false", true, (sender != null && ((AbstractGatewaySender)sender).isPrimary())));
}
private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
@@ -1980,22 +1352,13 @@ public class WANTestBase extends DistributedTestCase{
secondaryUpdatesMap.put("Update", listener1.updateList);
secondaryUpdatesMap.put("Destroy", listener1.destroyList);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- secondaryUpdatesMap.put("Create", listener1.createList);
- secondaryUpdatesMap.put("Update", listener1.updateList);
- secondaryUpdatesMap.put("Destroy", listener1.destroyList);
- if (secondaryUpdatesMap.equals(primaryUpdatesMap)) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap;
- }
- };
- Wait.waitForCriterion(wc, 300000, 500, true);
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ secondaryUpdatesMap.put("Create", listener1.createList);
+ secondaryUpdatesMap.put("Update", listener1.updateList);
+ secondaryUpdatesMap.put("Destroy", listener1.destroyList);
+ assertEquals("Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap,
+ true,secondaryUpdatesMap.equals(primaryUpdatesMap));
+ });
}
public static HashMap checkQueue2(){
@@ -2021,35 +1384,12 @@ public class WANTestBase extends DistributedTestCase{
PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
HashMap listenerAttrs = new HashMap();
for (int i = 0; i < numBuckets; i++) {
- BucketRegion br = region.getBucketRegion(i);
- QueueListener listener = (QueueListener)br.getCacheListener();
- listenerAttrs.put("Create"+i, listener.createList);
- listenerAttrs.put("Update"+i, listener.updateList);
- listenerAttrs.put("Destroy"+i, listener.destroyList);
- }
- return listenerAttrs;
- }
-
- public static HashMap checkQueue_PR(String senderId){
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for(GatewaySender s : senders){
- if(s.getId().equals(senderId)){
- sender = s;
- break;
- }
- }
-
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
- .getQueues().toArray(new RegionQueue[1])[0];
-
- PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
- QueueListener listener = (QueueListener)region.getCacheListener();
-
- HashMap listenerAttrs = new HashMap();
- listenerAttrs.put("Create", listener.createList);
- listenerAttrs.put("Update", listener.updateList);
- listenerAttrs.put("Destroy", listener.destroyList);
+ BucketRegion br = region.getBucketRegion(i);
+ QueueListener listener = (QueueListener)br.getCacheListener();
+ listenerAttrs.put("Create"+i, listener.createList);
+ listenerAttrs.put("Update"+i, listener.updateList);
+ listenerAttrs.put("Destroy"+i, listener.destroyList);
+ }
return listenerAttrs;
}
@@ -2062,7 +1402,7 @@ public class WANTestBase extends DistributedTestCase{
break;
}
}
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
.getQueues().toArray(new RegionQueue[1])[0];
PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
@@ -2110,7 +1450,7 @@ public class WANTestBase extends DistributedTestCase{
break;
}
}
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
.getQueues().toArray(new RegionQueue[1])[0];
PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
@@ -2163,7 +1503,7 @@ public class WANTestBase extends DistributedTestCase{
}
}
else {
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
.getQueues().toArray(new RegionQueue[1])[0];
parallelQueue.addCacheListener(listener1);
}
@@ -2186,7 +1526,7 @@ public class WANTestBase extends DistributedTestCase{
}
}
else {
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
.getQueues().toArray(new RegionQueue[1])[0];
parallelQueue.addCacheListener(listener2);
}
@@ -2215,27 +1555,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void pauseSenderAndWaitForDispatcherToPause(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- sender.pause();
- ((AbstractGatewaySender)sender).getEventProcessor().waitForDispatcherToPause();
- } finally {
- exp.remove();
- exln.remove();
- }
- }
-
public static void resumeSender(String senderId) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -2310,7 +1629,7 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, int remoteDsId,
+ public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName,
boolean isParallel, Integer maxMemory,
Integer batchSize, boolean isConflation, boolean isPersistent,
GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) {
@@ -2323,7 +1642,7 @@ public class WANTestBase extends DistributedTestCase{
gateway.setManualStart(isManualStart);
gateway.setDispatcherThreads(numDispatchers);
gateway.setOrderPolicy(policy);
- ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+ gateway.setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
eventFilter = filter;
gateway.addGatewayEventFilter(filter);
@@ -2350,7 +1669,7 @@ public class WANTestBase extends DistributedTestCase{
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] { persistentDirectory };
- GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
gateway.create(dsName, remoteDsId);
} finally {
@@ -2368,7 +1687,7 @@ public class WANTestBase extends DistributedTestCase{
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] { persistentDirectory };
- GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter,
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName,isParallel, maxMemory, batchSize, isConflation, isPersistent, filter,
isManualStart, numDispatchers, orderPolicy);
gateway.create(dsName, remoteDsId);
@@ -2377,10 +1696,8 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void createSenderWithoutDiskStore(String dsName, int remoteDsId,
- boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- GatewayEventFilter filter, boolean isManulaStart) {
+ public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isManulaStart) {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setParallel(true);
@@ -2403,53 +1720,10 @@ public class WANTestBase extends DistributedTestCase{
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] { persistentDirectory };
- GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
gateway.create(dsName, remoteDsId);
}
-// public static void createSender_PDX(String dsName, int remoteDsId,
-// boolean isParallel, Integer maxMemory,
-// Integer batchSize, boolean isConflation, boolean isPersistent,
-// GatewayEventFilter filter, boolean isManulaStart) {
-// File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
-// persistentDirectory.mkdir();
-//
-// File [] dirs1 = new File[] {persistentDirectory};
-//
-// if(isParallel) {
-// ParallelGatewaySenderFactory gateway = cache.createParallelGatewaySenderFactory();
-// gateway.setMaximumQueueMemory(maxMemory);
-// gateway.setBatchSize(batchSize);
-// ((ParallelGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
-// if (filter != null) {
-// gateway.addGatewayEventFilter(filter);
-// }
-// if(isPersistent) {
-// gateway.setPersistenceEnabled(true);
-// DiskStoreFactory dsf = cache.createDiskStoreFactory();
-// gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
-// }
-// gateway.setBatchConflationEnabled(isConflation);
-// gateway.create(dsName, remoteDsId);
-//
-// }else {
-// SerialGatewaySenderFactory gateway = cache.createSerialGatewaySenderFactory();
-// gateway.setMaximumQueueMemory(maxMemory);
-// gateway.setBatchSize(batchSize);
-// gateway.setManualStart(isManulaStart);
-// ((SerialGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
-// if (filter != null) {
-// gateway.addGatewayEventFilter(filter);
-// }
-// gateway.setBatchConflationEnabled(isConflation);
-// if(isPersistent) {
-// gateway.setPersistenceEnabled(true);
-// DiskStoreFactory dsf = cache.createDiskStoreFactory();
-// gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
-// }
-// gateway.create(dsName, remoteDsId);
-// }
-// }
public static void createSenderForValidations(String dsName, int remoteDsId,
boolean isParallel, Integer alertThreshold,
boolean isConflation, boolean isPersistent,
@@ -2527,8 +1801,7 @@ public class WANTestBase extends DistributedTestCase{
gateway.setDiskStoreName(store.getName());
}
gateway.setDiskSynchronous(isDiskSync);
- GatewaySender sender = gateway
- .create(dsName, remoteDsId);
+ gateway.create(dsName, remoteDsId);
}
}
finally {
@@ -2675,41 +1948,13 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void pauseWaitCriteria(final long millisec) {
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- return false;
- }
-
- public String description() {
- return "Expected to wait for " + millisec + " millisec.";
- }
- };
- Wait.waitForCriterion(wc, millisec, 500, false);
- }
-
- public static void createReceiverInVMs(int locatorPort, VM... vms) {
- for (VM vm : vms) {
- vm.invoke(() -> createReceiver(locatorPort));
- }
- }
-
- public static void createReceiverInVMsAsync(int locatorPort, VM... vms) {
- List<AsyncInvocation> tasks = new LinkedList<>();
+ public static void createReceiverInVMs(VM... vms) {
for (VM vm : vms) {
- tasks.add(vm.invokeAsync(() -> createReceiver(locatorPort)));
- }
- for (AsyncInvocation invocation : tasks) {
- try {
- invocation.join(30000);
- }
- catch (InterruptedException e) {
- fail("Failed starting up the receiver");
- }
+ vm.invoke(() -> createReceiver());
}
}
- public static int createReceiver(int locPort) {
+ public static int createReceiver() {
GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
fact.setStartPort(port);
@@ -2805,15 +2050,6 @@ public class WANTestBase extends DistributedTestCase{
return port;
}
- public static String makePath(String[] strings) {
- StringBuilder sb = new StringBuilder();
- for(int i=0;i<strings.length;i++){
- sb.append(strings[i]);
- sb.append(File.separator);
- }
- return sb.toString();
- }
-
public static void createReceiverAndServer(int locPort) {
WANTestBase test = new WANTestBase(getTestMethodName());
Properties props = test.getDistributedSystemProperties();
@@ -2841,7 +2077,6 @@ public class WANTestBase extends DistributedTestCase{
int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(serverPort);
server.setHostnameForClients("localhost");
- //server.setGroups(new String[]{"serv"});
try {
server.start();
} catch (IOException e) {
@@ -2849,23 +2084,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static int createReceiverInSecuredCache(int locPort) {
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- fact.setStartPort(port);
- fact.setEndPort(port);
- fact.setManualStart(true);
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- }
- catch (IOException e) {
- e.printStackTrace();
- com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start GatewayRecevier on port " + port, e);
- }
- return port;
- }
-
public static int createServer(int locPort) {
WANTestBase test = new WANTestBase(getTestMethodName());
Properties props = test.getDistributedSystemProperties();
@@ -2879,7 +2097,6 @@ public class WANTestBase extends DistributedTestCase{
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(port);
server.setHostnameForClients("localhost");
- //server.setGroups(new String[]{"serv"});
try {
server.start();
} catch (IOException e) {
@@ -2902,7 +2119,7 @@ public class WANTestBase extends DistributedTestCase{
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
try {
- p = PoolManager.createFactory().addLocator(host, port0) //.setServerGroup("serv")
+ p = PoolManager.createFactory().addLocator(host, port0)
.setPingInterval(250).setSubscriptionEnabled(true)
.setSubscriptionRedundancy(-1).setReadTimeout(2000)
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10)
@@ -2975,9 +2192,6 @@ public class WANTestBase extends DistributedTestCase{
exp1.remove();
exp2.remove();
}
-// for (long i = 0; i < numPuts; i++) {
-// r.destroy(i);
-// }
}
@@ -3013,20 +2227,6 @@ public class WANTestBase extends DistributedTestCase{
exp1.remove();
exp2.remove();
}
-// for (long i = 0; i < numPuts; i++) {
-// r.destroy(i);
-// }
- }
-
- /**
- * To be used for CacheLoader related tests
- */
- public static void doGets(String regionName, int numGets) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (long i = 0; i < numGets; i++) {
- r.get(i);
- }
}
public static void doPutsAfter300(String regionName, int numPuts) {
@@ -3090,40 +2290,8 @@ public class WANTestBase extends DistributedTestCase{
public static void destroyRegion(String regionName, final int min) {
final Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.size() > min) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Looking for min size of region to be " + min;
- }
- };
- Wait.waitForCriterion(wc, 30000, 5, false);
- r.destroyRegion();
- }
-
- public static void destroyRegionAfterMinRegionSize(String regionName, final int min) {
- final Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (destroyFlag) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Looking for min size of region to be " + min;
- }
- };
- Wait.waitForCriterion(wc, 30000, 5, false);
+ Awaitility.await().atMost(30,TimeUnit.SECONDS).until(() -> r.size() > min);
r.destroyRegion();
- destroyFlag = false;
}
public static void localDestroyRegion(String regionName) {
@@ -3228,24 +2396,22 @@ public class WANTestBase extends DistributedTestCase{
Map orderKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- Order order = new Order("OREDR" + oid + "_update");
- try {
- orderRegion.put(orderId, order);
- orderKeyValues.put(orderId, order);
- assertTrue(orderRegion.containsKey(orderId));
- assertEquals(order,orderRegion.get(orderId));
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ Order order = new Order("OREDR" + oid + "_update");
+ try {
+ orderRegion.put(orderId, order);
+ orderKeyValues.put(orderId, order);
+ assertTrue(orderRegion.containsKey(orderId));
+ assertEquals(order,orderRegion.get(orderId));
- }
- catch (Exception e) {
- com.gemstone.gemfire.test.dunit.Assert.fail(
- "updateOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
- e);
- }
- LogWriterUtils.getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
}
+ catch (Exception e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail(
+ "updateOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
+ e);
+ }
+ LogWriterUtils.getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
}
return orderKeyValues;
}
@@ -3278,28 +2444,24 @@ public class WANTestBase extends DistributedTestCase{
Map shipmentKeyValue = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- for (int k = 1; k <= 1; k++) {
- int sid = (oid * 1) + k;
- ShipmentId shipmentId = new ShipmentId(sid, orderId);
- Shipment shipment = new Shipment("Shipment" + sid);
- try {
- shipmentRegion.put(shipmentId, shipment);
- assertTrue(shipmentRegion.containsKey(shipmentId));
- assertEquals(shipment,shipmentRegion.get(shipmentId));
- shipmentKeyValue.put(shipmentId, shipment);
- }
- catch (Exception e) {
- com.gemstone.gemfire.test.dunit.Assert.fail(
- "putShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
- e);
- }
- LogWriterUtils.getLogWriter().info(
- "Shipment :- { " + shipmentId + " : " + shipment + " }");
- }
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ int sid = oid + 1;
+ ShipmentId shipmentId = new ShipmentId(sid, orderId);
+ Shipment shipment = new Shipment("Shipment" + sid);
+ try {
+ shipmentRegion.put(shipmentId, shipment);
+ assertTrue(shipmentRegion.containsKey(shipmentId));
+ assertEquals(shipment,shipmentRegion.get(shipmentId));
+ shipmentKeyValue.put(shipmentId, shipment);
}
+ catch (Exception e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail(
+ "putShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
+ e);
+ }
+ LogWriterUtils.getLogWriter().info(
+ "Shipment :- { " + shipmentId + " : " + shipment + " }");
}
return shipmentKeyValue;
}
@@ -3313,18 +2475,14 @@ public class WANTestBase extends DistributedTestCase{
CustId custid = new CustId(i);
Customer customer = new Customer("Customer" + custid, "Address" + custid);
customerRegion.put(custid, customer);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- Order order = new Order("Order"+orderId);
- orderRegion.put(orderId, order);
- for (int k = 1; k <= 1; k++) {
- int sid = (oid * 1) + k;
- ShipmentId shipmentId = new ShipmentId(sid, orderId);
- Shipment shipment = new Shipment("Shipment" + sid);
- shipmentRegion.put(shipmentId, shipment);
- }
- }
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ Order order = new Order("Order"+orderId);
+ orderRegion.put(orderId, order);
+ int sid = oid + 1;
+ ShipmentId shipmentId = new ShipmentId(sid, orderId);
+ Shipment shipment = new Shipment("Shipment" + sid);
+ shipmentRegion.put(shipmentId, shipment);
}
}
@@ -3356,28 +2514,24 @@ public class WANTestBase extends DistributedTestCase{
Map shipmentKeyValue = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- for (int k = 1; k <= 1; k++) {
- int sid = (oid * 1) + k;
- ShipmentId shipmentId = new ShipmentId(sid, orderId);
- Shipment shipment = new Shipment("Shipment" + sid + "_update");
- try {
- shipmentRegion.put(shipmentId, shipment);
- assertTrue(shipmentRegion.containsKey(shipmentId));
- assertEquals(shipment,shipmentRegion.get(shipmentId));
- shipmentKeyValue.put(shipmentId, shipment);
- }
- catch (Exception e) {
- com.gemstone.gemfire.test.dunit.Assert.fail(
- "updateShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
- e);
- }
- LogWriterUtils.getLogWriter().info(
- "Shipment :- { " + shipmentId + " : " + shipment + " }");
- }
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ int sid = oid + 1;
+ ShipmentId shipmentId = new ShipmentId(sid, orderId);
+ Shipment shipment = new Shipment("Shipment" + sid + "_update");
+ try {
+ shipmentRegion.put(shipmentId, shipment);
+ assertTrue(shipmentRegion.containsKey(shipmentId));
+ assertEquals(shipment,shipmentRegion.get(shipmentId));
+ shipmentKeyValue.put(shipmentId, shipment);
+ }
+ catch (Exception e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail(
+ "updateShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
+ e);
}
+ LogWriterUtils.getLogWriter().info(
+ "Shipment :- { " + shipmentId + " : " + shipment + " }");
}
return shipmentKeyValue;
}
@@ -3421,7 +2575,7 @@ public class WANTestBase extends DistributedTestCase{
}
- public static void doTxPuts(String regionName, int numPuts) {
+ public static void doTxPuts(String regionName) {
Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
CacheTransactionManager mgr = cache.getCacheTransactionManager();
@@ -3434,7 +2588,6 @@ public class WANTestBase extends DistributedTestCase{
}
public static void doNextPuts(String regionName, int start, int numPuts) {
- //waitForSitesToUpdate();
IgnoredException exp = IgnoredException.addIgnoredException(CacheClosedException.class
.getName());
try {
@@ -3494,30 +2647,16 @@ public class WANTestBase extends DistributedTestCase{
}
if (sender.isParallel()) {
- int totalSize = 0;
final Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
-
- WaitCriterion wc = new WaitCriterion() {
+ Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> {
int size = 0;
- public boolean done() {
- for (RegionQueue q : queues) {
- ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue)q;
- size += prQ.localSize();
- }
- if (size == numQueueEntries) {
- return true;
- }
- return false;
+ for (RegionQueue q : queues) {
+ ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue)q;
+ size += prQ.localSize();
}
-
- public String description() {
- return " Expected local queue entries: " + numQueueEntries
- + " but actual entries: " + size;
- }
-
- };
-
- Wait.waitForCriterion(wc, 120000, 500, true);
+ assertEquals(" Expected local queue entries: " + numQueueEntries
+ + " but actual entries: " + size, numQueueEntries, size);
+ });
}
}
@@ -3545,38 +2684,6 @@ public class WANTestBase extends DistributedTestCase{
return -1;
}
- public static void doUpdates(String regionName, int numUpdates) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (int i = 0; i < numUpdates; i++) {
- String s = "K"+i;
- r.put(i, s);
- }
- }
-
- public static void doUpdateOnSameKey(String regionName, int key,
- int numUpdates) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (int i = 0; i < numUpdates; i++) {
- String s = "V_" + i;
- r.put(key, s);
- }
- }
-
- public static void doRandomUpdates(String regionName, int numUpdates) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- Set<Integer> generatedKeys = new HashSet<Integer>();
- while(generatedKeys.size() != numUpdates) {
- generatedKeys.add((new Random()).nextInt(r.size()));
- }
- for (Integer i: generatedKeys) {
- String s = "K"+i;
- r.put(i, s);
- }
- }
-
public static void doMultiThreadedPuts(String regionName, int numPuts) {
final AtomicInteger ai = new AtomicInteger(-1);
final ExecutorService execService = Executors.newFixedThreadPool(5,
@@ -3614,68 +2721,26 @@ public class WANTestBase extends DistributedTestCase{
validateRegionSize(regionName, regionSize, 30000);
}
- public static void validateRegionSize(String regionName, final int regionSize, long waitTime) {
+ public static void validateRegionSize(String regionName, final int regionSize, long waitTimeInMilliSec) {
IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
.getName());
IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
.getName());
try {
-
final Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.keySet().size() == regionSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected region entries: " + regionSize
- + " but actual entries: " + r.keySet().size()
- + " present region keyset " + r.keySet();
- }
- };
- Wait.waitForCriterion(wc, waitTime, 500, true);
+ if ( regionSize != r.keySet().size()) {
+ Awaitility.await().atMost(waitTimeInMilliSec, TimeUnit.MILLISECONDS).pollInterval(500, TimeUnit.MILLISECONDS)
+ .until(() ->
+ assertEquals("Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size()
+ + " present region keyset " + r.keySet(), regionSize, r.keySet().size()));
+ }
} finally {
exp.remove();
exp1.remove();
}
}
- /**
- * Validate whether all the attributes set on AsyncEventQueueFactory are set
- * on the sender underneath the AsyncEventQueue.
- */
- public static void validateAsyncEventQueueAttributes(String asyncChannelId,
- int maxQueueMemory, int batchSize, int batchTimeInterval,
- boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
- boolean batchConflationEnabled) {
-
- AsyncEventQueue theChannel = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncChannelId.equals(asyncChannel.getId())) {
- theChannel = asyncChannel;
- }
- }
-
- GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
- assertEquals("maxQueueMemory", maxQueueMemory, theSender
- .getMaximumQueueMemory());
- assertEquals("batchSize", batchSize, theSender.getBatchSize());
- assertEquals("batchTimeInterval", batchTimeInterval, theSender
- .getBatchTimeInterval());
- assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
- assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
- assertEquals("isDiskSynchronous", isDiskSynchronous, theSender
- .isDiskSynchronous());
- assertEquals("batchConflation", batchConflationEnabled, theSender
- .isBatchConflationEnabled());
- }
-
public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
AsyncEventListener theListener = null;
@@ -3688,153 +2753,43 @@ public class WANTestBase extends DistributedTestCase{
final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
assertNotNull(eventsMap);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (eventsMap.size() == expectedSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected map entries: " + expectedSize
- + " but actual entries: " + eventsMap.size();
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true); //TODO:Yogs
- }
-
- public static void validateCustomAsyncEventListener(String asyncQueueId,
- final int expectedSize) {
- AsyncEventListener theListener = null;
-
- Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncQueue : asyncEventQueues) {
- if (asyncQueueId.equals(asyncQueue.getId())) {
- theListener = asyncQueue.getAsyncEventListener();
- }
- }
-
- final Map eventsMap = ((CustomAsyncEventListener) theListener).getEventsMap();
- assertNotNull(eventsMap);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (eventsMap.size() == expectedSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected map entries: " + expectedSize
- + " but actual entries: " + eventsMap.size();
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
-
- Iterator<AsyncEvent> itr = eventsMap.values().iterator();
- while (itr.hasNext()) {
- AsyncEvent event = itr.next();
- assertTrue("possibleDuplicate should be true for event: " + event, event.getPossibleDuplicate());
- }
+ Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> assertEquals("Expected map entries: " + expectedSize
+ + " but actual entries: " + eventsMap.size(), expectedSize, eventsMap.size()));
}
public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
AsyncEventQueue theAsyncEventQueue = null;
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theAsyncEventQueue = asyncChannel;
- }
- }
-
- final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
- .getSender();
-
- if (sender.isParallel()) {
- final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
- .getQueues();
-
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- if (size == 0) {
- return true;
- }
- return false;
- }
-
- public String description() {
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- return "Expected queue size to be : " + 0 + " but actual entries: "
- + size;
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true);
-
- } else {
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
- .getQueues();
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- if (size == 0) {
- return true;
- }
- return false;
- }
-
- public String description() {
- Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
- .getQueues();
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- return "Expected queue size to be : " + 0 + " but actual entries: "
- + size;
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true);
- }
- }
-
- public static void verifyAsyncEventListenerForPossibleDuplicates(
- String asyncEventQueueId, Set<Integer> bucketIds, int batchSize) {
- AsyncEventListener theListener = null;
-
- Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncQueue : asyncEventQueues) {
- if (asyncEventQueueId.equals(asyncQueue.getId())) {
- theListener = asyncQueue.getAsyncEventListener();
+ Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+ for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+ if (asyncQueueId.equals(asyncChannel.getId())) {
+ theAsyncEventQueue = asyncChannel;
}
}
- final Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap = ((MyAsyncEventListener2)theListener)
- .getBucketToEventsMap();
- assertNotNull(bucketToEventsMap);
- assertTrue(bucketIds.size() > 1);
+ final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+ .getSender();
- for (int bucketId : bucketIds) {
- List<GatewaySenderEventImpl> eventsForBucket = bucketToEventsMap
- .get(bucketId);
- LogWriterUtils.getLogWriter().info(
- "Events for bucket: " + bucketId + " is " + eventsForBucket);
- assertNotNull(eventsForBucket);
- for (int i = 0; i < batchSize; i++) {
- GatewaySenderEventImpl senderEvent = eventsForBucket.get(i);
- assertTrue(senderEvent.getPossibleDuplicate());
- }
+ if (sender.isParallel()) {
+ final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+ .getQueues();
+ Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> {
+ int size = 0;
+ for (RegionQueue q : queues) {
+ size += q.size();
+ }
+ assertEquals("Expected queue size to be : " + 0 + " but actual entries: " + size, 0, size);
+ });
+ } else {
+ Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> {
+ Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+ .getQueues();
+ int size = 0;
+ for (RegionQueue q : queues) {
+ size += q.size();
+ }
+ assertEquals("Expected queue size to be : " + 0 + " but actual entries: " + size, 0, size);
+ });
}
}
@@ -3854,81 +2809,21 @@ public class WANTestBase extends DistributedTestCase{
return eventsMap.size();
}
- public static int getAsyncEventQueueSize(String asyncEventQueueId) {
- AsyncEventQueue theQueue = null;
-
- Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncQueue : asyncEventQueues) {
- if (asyncEventQueueId.equals(asyncQueue.getId())) {
- theQueue = asyncQueue;
- }
- }
- assertNotNull(theQueue);
- return theQueue.size();
- }
-
-
public static void validateRegionSize_PDX(String regionName, final int regionSize) {
final Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.keySet().size() >= regionSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
-
- return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet() ;
- }
- };
- Wait.waitForCriterion(wc, 200000, 500, true);
+ Awaitility.await().atMost(200,TimeUnit.SECONDS).until(() -> assertEquals("Expected region entries: " + regionSize +
+ " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet(),
+ true,(regionSize <= r.keySet().size())));
for(int i = 0 ; i < regionSize; i++){
LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i));
assertEquals(new SimpleClass(i, (byte)i), r.get("Key_" + i));
}
}
- public static void validateRegionSize_PDX2(String regionName, final int regionSize) {
- final Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.keySet().size() == regionSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
-
- return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet() ;
- }
- };
- Wait.waitForCriterion(wc, 200000, 500, true);
- for(int i = 0 ; i < regionSize; i++){
- LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i));
- assertEquals(new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i), r.get("Key_" + i));
- }
- }
public static void validateQueueSizeStat(String id, final int queueSize) {
final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
-
- Wait.waitForCriterion(new WaitCriterion() {
-
- @Override
- public boolean done() {
- return sender.getEventQueueSize() == queueSize;
- }
-
- @Override
- public String description() {
- // TODO Auto-generated method stub
- return null;
- }
- }, 30000, 50, false);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
assertEquals(queueSize, sender.getEventQueueSize());
}
/**
@@ -3987,97 +2882,21 @@ public class WANTestBase extends DistributedTestCase{
public static void validateRegionContents(String regionName, final Map keyValues) {
final Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- for(Object key: keyValues.keySet()) {
- if (!r.get(key).equals(keyValues.get(key))) {
- LogWriterUtils.getLogWriter().info(
- "The values are for key " + " " + key + " " + r.get(key)
- + " in the map " + keyValues.get(key));
- return false;
- }
- }
- return true;
- }
-
- public String description() {
- return "Expected region entries doesn't match";
- }
- };
- Wait.waitForCriterion(wc, 120000, 500, true);
- }
-
- public static void CheckContent(String regionName, final int regionSize) {
- final Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (long i = 0; i < regionSize; i++) {
- assertEquals(i, r.get(i));
- }
- }
-
- public static void validateRegionContentsForPR(String regionName,
- final int regionSize) {
- final Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.keySet().size() == regionSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size();
- }
- };
- Wait.waitForCriterion(wc, 120000, 500, true);
- }
-
- public static void verifyPrimaryStatus(final Boolean isPrimary) {
- final Set<GatewaySender> senders = cache.getGatewaySenders();
- assertEquals(senders.size(), 1);
- final AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
-
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender.isPrimary() == isPrimary.booleanValue()) {
- return true;
+ Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> {
+ boolean matchFlag = true;
+ for(Object key: keyValues.keySet()) {
+ if (!r.get(key).equals(keyValues.get(key))) {
+ LogWriterUtils.getLogWriter().info(
+ "The values are for key " + " " + key + " " + r.get(key)
+ + " in the map " + keyValues.get(key));
+ matchFlag = false;
}
- return false;
}
-
- public String description() {
- return "Expected sender to be : " + isPrimary.booleanValue() + " but actually it is : " + sender.isPrimary();
- }
- };
- Wait.waitForCriterion(wc, 120000, 500, true);
+ assertEquals("Expected region entries doesn't match", true, matchFlag);
+ });
}
- public static Boolean getPrimaryStatus(){
- Set<GatewaySender> senders = cache.getGatewaySenders();
- assertEquals(senders.size(), 1);
- final AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender.isPrimary()) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Checking Primary Status";
- }
- };
- Wait.waitForCriterion(wc, 10000, 500, false);
- return sender.isPrimary();
- }
- public static Set<Integer> getAllPrimaryBucketsOnTheNode(String regionName) {
- PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
- return region.getDataStore().getAllLocalPrimaryBucketIds();
- }
public static void doHeavyPuts(String regionName, int numPuts) {
Region r = cache.getRegion(Region.SEPARATOR + regionName);
@@ -4089,24 +2908,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void addListenerAndKillPrimary(){
- Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
- assertEquals(senders.size(), 1);
- AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
- Region queue = cache.getRegion(Region.SEPARATOR+sender.getId()+"_SERIAL_GATEWAY_SENDER_QUEUE");
- assertNotNull(queue);
- CacheListenerAdapter cl = new CacheListenerAdapter() {
- public void afterCreate(EntryEvent event) {
- if((Long)event.getKey() > 900){
- cache.getLogger().fine(" Gateway sender is killed by a test");
- cache.close();
- cache.getDistributedSystem().disconnect();
- }
- }
- };
- queue.getAttributesMutator().addCacheListener(cl);
- }
-
public static void addCacheListenerAndDestroyRegion(String regionName){
final Region region = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(region);
@@ -4121,22 +2922,6 @@ public class WANTestBase extends DistributedTestCase{
region.getAttributesMutator().addCacheListener(cl);
}
- public static void addCacheListenerAndCloseCache(String regionName){
- final Region region = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(region);
- CacheListenerAdapter cl = new CacheListenerAdapter() {
- @Override
- public void afterCreate(EntryEvent event) {
- if((Long)event.getKey() == 900){
- cache.getLogger().fine(" Gateway sender is killed by a test");
- cache.close();
- cache.getDistributedSystem().disconnect();
-
<TRUNCATED>