You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ja...@apache.org on 2016/04/18 19:40:39 UTC
[3/5] incubator-geode git commit: GEODE-1032 : Additional wait time
to check for empty queue,
refactored WANTestBase.java to remove unused functions,
replaced wait criterions with awaitility.
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/84a4b474/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
index e93c9c2..aca2cb9 100644
--- a/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
+++ b/geode-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/WANTestBase.java
@@ -30,7 +30,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
@@ -60,12 +59,10 @@ import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.RegionFactory;
import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
-import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueStats;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
@@ -154,8 +151,6 @@ public class WANTestBase extends DistributedTestCase{
protected static QueueListener listener1;
protected static QueueListener listener2;
- protected static List<QueueListener> gatewayListeners;
-
protected static AsyncEventListener eventListener1 ;
protected static AsyncEventListener eventListener2 ;
@@ -163,8 +158,6 @@ public class WANTestBase extends DistributedTestCase{
protected static GatewayEventFilter eventFilter;
- protected static boolean destroyFlag = false;
-
protected static List<Integer> dispatcherThreads =
new ArrayList<Integer>(Arrays.asList(1, 3, 5));
//this will be set for each test method run with one of the values from above list
@@ -315,7 +308,6 @@ public class WANTestBase extends DistributedTestCase{
props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost");
props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
test.getSystem(props);
- return;
}
@@ -376,8 +368,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -400,8 +390,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -411,32 +399,12 @@ public class WANTestBase extends DistributedTestCase{
assertNotNull(r);
}
-// public static void createReplicatedRegion_PDX(String regionName, String senderId, DataPolicy policy, InterestPolicy intPolicy){
-// AttributesFactory fact = new AttributesFactory();
-// if(senderId!= null){
-// StringTokenizer tokenizer = new StringTokenizer(senderId, ",");
-// while (tokenizer.hasMoreTokens()){
-// String sender = tokenizer.nextToken();
-// //fact.addSerialGatewaySenderId(sender);
-// }
-// }
-// fact.setDataPolicy(policy);
-// SubscriptionAttributes subAttr = new SubscriptionAttributes(intPolicy);
-// fact.setSubscriptionAttributes(subAttr);
-// fact.setScope(Scope.DISTRIBUTED_ACK);
-// Region r = cache.createRegionFactory(fact.create()).create(regionName);
-// assertNotNull(r);
-// assertTrue(r.size() == 0);
-// }
-
public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
AttributesFactory fact = new AttributesFactory();
if(senderIds!= null){
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -446,27 +414,6 @@ public class WANTestBase extends DistributedTestCase{
assertNotNull(r);
}
-// public static void createReplicatedRegionWithParallelSenderId(String regionName, String senderId){
-// AttributesFactory fact = new AttributesFactory();
-// if(senderId!= null){
-// StringTokenizer tokenizer = new StringTokenizer(senderId, ",");
-// while (tokenizer.hasMoreTokens()){
-// String sender = tokenizer.nextToken();
-// //fact.addParallelGatewaySenderId(sender);
-// }
-// }
-// fact.setDataPolicy(DataPolicy.REPLICATE);
-// Region r = cache.createRegionFactory(fact.create()).create(regionName);
-// assertNotNull(r);
-// }
-
-// public static void createReplicatedRegion(String regionName){
-// AttributesFactory fact = new AttributesFactory();
-// fact.setDataPolicy(DataPolicy.REPLICATE);
-// Region r = cache.createRegionFactory(fact.create()).create(regionName);
-// assertNotNull(r);
-// }
-
public static void createReplicatedRegionWithAsyncEventQueue(
String regionName, String asyncQueueIds, Boolean offHeap) {
IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -491,31 +438,11 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void createPersistentReplicatedRegionWithAsyncEventQueue(
- String regionName, String asyncQueueIds) {
-
- AttributesFactory fact = new AttributesFactory();
- if(asyncQueueIds != null){
- StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
- while (tokenizer.hasMoreTokens()){
- String asyncQueueId = tokenizer.nextToken();
- fact.addAsyncEventQueueId(asyncQueueId);
- }
- }
- fact.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
- RegionFactory regionFactory = cache.createRegionFactory(fact.create());
- Region r = regionFactory.create(regionName);
- assertNotNull(r);
- }
-
-
-
public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
.getName());
try {
-
AttributesFactory fact = new AttributesFactory();
if (senderIds != null) {
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
@@ -543,8 +470,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -567,7 +492,7 @@ public class WANTestBase extends DistributedTestCase{
File[] dirs1 = new File[] { directory };
DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
+ dsf.create(diskStoreName);
}
AsyncEventListener asyncEventListener = new MyAsyncEventListener();
@@ -582,305 +507,7 @@ public class WANTestBase extends DistributedTestCase{
factory.setParallel(isParallel);
//set dispatcher threads
factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
- }
-
- public static void createAsyncEventQueueWithListener2(String asyncChannelId,
- boolean isParallel, Integer maxMemory, Integer batchSize,
- boolean isPersistent, String diskStoreName) {
-
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
-
- AsyncEventListener asyncEventListener = new MyAsyncEventListener2();
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- //set dispatcher threads
- factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
- asyncEventListener);
- }
-
- public static void createAsyncEventQueue(
- String asyncChannelId, boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- String diskStoreName, boolean isDiskSynchronous, String asyncListenerClass) throws Exception {
-
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
-
- String packagePrefix = "com.gemstone.gemfire.internal.cache.wan.";
- String className = packagePrefix + asyncListenerClass;
- AsyncEventListener asyncEventListener = null;
- try {
- Class clazz = Class.forName(className);
- asyncEventListener = (AsyncEventListener) clazz.newInstance();
- } catch (ClassNotFoundException e) {
- throw e;
- } catch (InstantiationException e) {
- throw e;
- } catch (IllegalAccessException e) {
- throw e;
- }
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setDiskSynchronous(isDiskSynchronous);
- factory.setBatchConflationEnabled(isConflation);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- //set dispatcher threads
- factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
- }
-
- public static void createAsyncEventQueueWithCustomListener(
- String asyncChannelId, boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- String diskStoreName, boolean isDiskSynchronous) {
- createAsyncEventQueueWithCustomListener(asyncChannelId, isParallel, maxMemory, batchSize,
- isConflation, isPersistent, diskStoreName, isDiskSynchronous, GatewaySender.DEFAULT_DISPATCHER_THREADS);
- }
-
- public static void createAsyncEventQueueWithCustomListener(
- String asyncChannelId, boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- String diskStoreName, boolean isDiskSynchronous, int nDispatchers) {
-
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
-
- try {
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
-
- AsyncEventListener asyncEventListener = new CustomAsyncEventListener();
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- factory.setDispatcherThreads(nDispatchers);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId,
- asyncEventListener);
- } finally {
- exp.remove();
- }
- }
-
- public static void createConcurrentAsyncEventQueue(
- String asyncChannelId, boolean isParallel,
- Integer maxMemory, Integer batchSize, boolean isConflation,
- boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
- int dispatcherThreads, OrderPolicy policy) {
-
- if (diskStoreName != null) {
- File directory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- directory.mkdir();
- File[] dirs1 = new File[] { directory };
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- dsf.setDiskDirs(dirs1);
- DiskStore ds = dsf.create(diskStoreName);
- }
-
- AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(diskStoreName);
- factory.setDiskSynchronous(isDiskSynchronous);
- factory.setBatchConflationEnabled(isConflation);
- factory.setMaximumQueueMemory(maxMemory);
- factory.setParallel(isParallel);
- factory.setDispatcherThreads(dispatcherThreads);
- factory.setOrderPolicy(policy);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
- }
-
-
- public static String createAsyncEventQueueWithDiskStore(
- String asyncChannelId, boolean isParallel,
- Integer maxMemory, Integer batchSize,
- boolean isPersistent, String diskStoreName) {
-
- AsyncEventListener asyncEventListener = new MyAsyncEventListener();
-
- File persistentDirectory = null;
- if (diskStoreName == null) {
- persistentDirectory = new File(asyncChannelId + "_disk_"
- + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
- } else {
- persistentDirectory = new File(diskStoreName);
- }
- LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName());
- persistentDirectory.mkdir();
- DiskStoreFactory dsf = cache.createDiskStoreFactory();
- File [] dirs1 = new File[] {persistentDirectory};
-
- AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
- factory.setBatchSize(batchSize);
- factory.setParallel(isParallel);
- if (isPersistent) {
- factory.setPersistent(isPersistent);
- factory.setDiskStoreName(dsf.setDiskDirs(dirs1).create(asyncChannelId).getName());
- }
- factory.setMaximumQueueMemory(maxMemory);
- //set dispatcher threads
- factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
- AsyncEventQueue asyncChannel = factory.create(asyncChannelId, asyncEventListener);
- return persistentDirectory.getName();
- }
-
- public static void pauseAsyncEventQueue(String asyncChannelId) {
- AsyncEventQueue theChannel = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncChannelId.equals(asyncChannel.getId())) {
- theChannel = asyncChannel;
- }
- }
-
- ((AsyncEventQueueImpl)theChannel).getSender().pause();
- }
-
- public static void pauseAsyncEventQueueAndWaitForDispatcherToPause(String asyncChannelId) {
- AsyncEventQueue theChannel = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncChannelId.equals(asyncChannel.getId())) {
- theChannel = asyncChannel;
- break;
- }
- }
-
- ((AsyncEventQueueImpl)theChannel).getSender().pause();
-
-
- ((AbstractGatewaySender)((AsyncEventQueueImpl)theChannel).getSender()).getEventProcessor().waitForDispatcherToPause();
- }
-
- public static void resumeAsyncEventQueue(String asyncQueueId) {
- AsyncEventQueue theQueue = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theQueue = asyncChannel;
- }
- }
-
- ((AsyncEventQueueImpl)theQueue).getSender().resume();
- }
-
-
- public static void checkAsyncEventQueueSize(String asyncQueueId, int numQueueEntries) {
- AsyncEventQueue theAsyncEventQueue = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theAsyncEventQueue = asyncChannel;
- }
- }
-
- GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue).getSender();
-
- if (sender.isParallel()) {
- Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
- assertEquals(numQueueEntries,
- queues.toArray(new RegionQueue[queues.size()])[0].getRegion().size());
- } else {
- Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- assertEquals(numQueueEntries, size);
- }
- }
-
- /**
- * This method verifies the queue size of a ParallelGatewaySender. For
- * ParallelGatewaySender conflation happens in a separate thread, hence test
- * code needs to wait for some time for expected result
- *
- * @param asyncQueueId
- * Async Queue ID
- * @param numQueueEntries
- * expected number of Queue entries
- * @throws Exception
- */
- public static void waitForAsyncEventQueueSize(String asyncQueueId,
- final int numQueueEntries) throws Exception {
- AsyncEventQueue theAsyncEventQueue = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theAsyncEventQueue = asyncChannel;
- }
- }
-
- GatewaySender sender = ((AsyncEventQueueImpl) theAsyncEventQueue)
- .getSender();
-
- if (sender.isParallel()) {
- final Set<RegionQueue> queues = ((AbstractGatewaySender) sender)
- .getQueues();
-
- Wait.waitForCriterion(new WaitCriterion() {
-
- public String description() {
- return "Waiting for EventQueue size to be " + numQueueEntries;
- }
-
- public boolean done() {
- boolean done = numQueueEntries == queues
- .toArray(new RegionQueue[queues.size()])[0].getRegion().size();
- return done;
- }
-
- }, MAX_WAIT, 500, true);
-
- } else {
- throw new Exception(
- "This method should be used for only ParallelGatewaySender,SerialGatewaySender should use checkAsyncEventQueueSize() method instead");
-
- }
+ factory.create(asyncChannelId, asyncEventListener);
}
public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
@@ -894,8 +521,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -927,8 +552,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -958,8 +581,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -994,111 +615,6 @@ public class WANTestBase extends DistributedTestCase{
mutator.addAsyncEventQueueId(queueId);
}
- public static void createPartitionedRegionWithAsyncEventQueue(
- String regionName, String asyncEventQueueId, Boolean offHeap) {
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
- .getName());
- try {
- AttributesFactory fact = new AttributesFactory();
-
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- pfact.setTotalNumBuckets(16);
- fact.setPartitionAttributes(pfact.create());
- fact.setOffHeap(offHeap);
- Region r = cache.createRegionFactory(fact.create())
- .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
- assertNotNull(r);
- }
- finally {
- exp.remove();
- exp1.remove();
- }
- }
-
- public static void createColocatedPartitionedRegionWithAsyncEventQueue(
- String regionName, String asyncEventQueueId, Integer totalNumBuckets, String colocatedWith) {
-
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
- .getName());
- try {
- AttributesFactory fact = new AttributesFactory();
-
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- pfact.setTotalNumBuckets(totalNumBuckets);
- pfact.setColocatedWith(colocatedWith);
- fact.setPartitionAttributes(pfact.create());
- Region r = cache.createRegionFactory(fact.create())
- .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
- assertNotNull(r);
- }
- finally {
- exp.remove();
- exp1.remove();
- }
- }
-
- public static void createPersistentPartitionedRegionWithAsyncEventQueue(
- String regionName, String asyncEventQueueId) {
- AttributesFactory fact = new AttributesFactory();
-
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
- pfact.setTotalNumBuckets(16);
- fact.setPartitionAttributes(pfact.create());
- if (asyncEventQueueId != null) {
- StringTokenizer tokenizer = new StringTokenizer(asyncEventQueueId, ",");
- while (tokenizer.hasMoreTokens()) {
- String asyncId = tokenizer.nextToken();
- fact.addAsyncEventQueueId(asyncId);
- }
- }
- Region r = cache.createRegionFactory(fact.create()).create(regionName);
- assertNotNull(r);
- }
-
- /**
- * Create PartitionedRegion with 1 redundant copy
- */
- public static void createPRWithRedundantCopyWithAsyncEventQueue(
- String regionName, String asyncEventQueueId, Boolean offHeap) {
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
-
- try {
- AttributesFactory fact = new AttributesFactory();
-
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- pfact.setTotalNumBuckets(16);
- pfact.setRedundantCopies(1);
- fact.setPartitionAttributes(pfact.create());
- fact.setOffHeap(offHeap);
- Region r = cache.createRegionFactory(fact.create())
- .addAsyncEventQueueId(asyncEventQueueId).create(regionName);
- assertNotNull(r);
- }
- finally {
- exp.remove();
- }
- }
-
- public static void createPartitionedRegionAccessorWithAsyncEventQueue(
- String regionName, String asyncEventQueueId) {
- AttributesFactory fact = new AttributesFactory();
- PartitionAttributesFactory pfact = new PartitionAttributesFactory();
- pfact.setTotalNumBuckets(16);
- pfact.setLocalMaxMemory(0);
- fact.setPartitionAttributes(pfact.create());
- Region r = cache.createRegionFactory(
- fact.create()).addAsyncEventQueueId(
- asyncEventQueueId).create(regionName);
- //fact.create()).create(regionName);
- assertNotNull(r);
- }
-
public static void createPartitionedRegionAsAccessor(
String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
AttributesFactory fact = new AttributesFactory();
@@ -1106,8 +622,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1126,8 +640,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1135,8 +647,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(parallelSenderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1166,8 +676,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1187,7 +695,7 @@ public class WANTestBase extends DistributedTestCase{
}
public static void createCustomerOrderShipmentPartitionedRegion(
- String regionName, String senderIds, Integer redundantCopies,
+ String senderIds, Integer redundantCopies,
Integer totalNumBuckets, Boolean offHeap) {
IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
.getName());
@@ -1197,15 +705,11 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
PartitionAttributesFactory paf = new PartitionAttributesFactory();
- // creating colocated Regions
- paf = new PartitionAttributesFactory();
paf.setRedundantCopies(redundantCopies)
.setTotalNumBuckets(totalNumBuckets)
.setPartitionResolver(
@@ -1230,8 +734,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1255,8 +757,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()) {
String senderId = tokenizer.nextToken();
- // GatewaySender sender = cache.getGatewaySender(senderId);
- // assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1280,8 +780,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1309,8 +807,6 @@ public class WANTestBase extends DistributedTestCase{
StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
while (tokenizer.hasMoreTokens()){
String senderId = tokenizer.nextToken();
-// GatewaySender sender = cache.getGatewaySender(senderId);
-// assertNotNull(sender);
fact.addGatewaySenderId(senderId);
}
}
@@ -1340,21 +836,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void createCacheInVMsAsync(Integer locatorPort, VM... vms) {
- List<AsyncInvocation> tasks = new LinkedList<>();
- for (VM vm : vms) {
- tasks.add(vm.invokeAsync(() -> createCache(locatorPort)));
- }
- for (AsyncInvocation invocation : tasks) {
- try {
- invocation.join(60000);
- }
- catch (InterruptedException e) {
- fail("Failed starting up the cache");
- }
- }
- }
-
public static void addListenerToSleepAfterCreateEvent(int milliSeconds) {
cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
.addCacheListener(new CacheListenerAdapter<Object, Object>() {
@@ -1370,6 +851,31 @@ public class WANTestBase extends DistributedTestCase{
});
}
+ private static CacheListener myListener;
+ public static void longPauseAfterNumEvents(int numEvents, int milliSeconds) {
+ myListener = new CacheListenerAdapter<Object, Object>() {
+ @Override
+ public void afterCreate(final EntryEvent<Object, Object> event) {
+ try {
+ if (event.getRegion().size() >= numEvents){
+ Thread.sleep(milliSeconds);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
+ .addCacheListener(myListener);
+ }
+
+ public static void removeCacheListener() {
+ cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
+ .removeCacheListener(myListener);
+
+ }
+
public static void createCache(Integer locPort){
createCache(false, locPort);
@@ -1449,7 +955,7 @@ public class WANTestBase extends DistributedTestCase{
File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File [] dirs1 = new File[] {pdxDir};
- DiskStore store = dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
+ dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
}
public static void createCache(Integer locPort1, Integer locPort2){
@@ -1576,19 +1082,6 @@ public class WANTestBase extends DistributedTestCase{
sender.test_setBatchConflationEnabled(true);
}
- public static void startAsyncEventQueue(String senderId) {
- Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
- AsyncEventQueue q = null;
- for (AsyncEventQueue s : queues) {
- if (s.getId().equals(senderId)) {
- q = s;
- break;
- }
- }
- //merge42180: There is no start method on AsyncEventQueue. Cheetah has this method. Yet the code for AsyncEvnt Queue is not properly merged from cheetah to cedar
- //q.start();
- }
-
public static Map getSenderToReceiverConnectionInfo(String senderId){
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
@@ -1622,25 +1115,13 @@ public class WANTestBase extends DistributedTestCase{
break;
}
}
- final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+ final GatewaySenderStats statistics = sender.getStatistics();
if (expectedQueueSize != -1) {
final RegionQueue regionQueue;
regionQueue = sender.getQueues().toArray(
new RegionQueue[1])[0];
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (regionQueue.size() == expectedQueueSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected queue entries: " + expectedQueueSize
- + " but actual entries: " + regionQueue.size();
- }
- };
- Wait.waitForCriterion(wc, 120000, 500, true);
+ Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> assertEquals("Expected queue entries: " +
+ expectedQueueSize + " but actual entries: " + regionQueue.size(), expectedQueueSize,regionQueue.size()));
}
ArrayList<Integer> stats = new ArrayList<Integer>();
stats.add(statistics.getEventQueueSize());
@@ -1674,28 +1155,10 @@ public class WANTestBase extends DistributedTestCase{
assert(statistics.getEventsDistributed() >= eventsDistributed);
}
- public static void checkAsyncEventQueueStats(String queueId, final int queueSize,
- final int eventsReceived, final int eventsQueued,
- final int eventsDistributed) {
- Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : asyncQueues) {
- if (q.getId().equals(queueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
- assertEquals(queueSize, statistics.getEventQueueSize());
- assertEquals(eventsReceived, statistics.getEventsReceived());
- assertEquals(eventsQueued, statistics.getEventsQueued());
- assert(statistics.getEventsDistributed() >= eventsDistributed);
- }
-
public static void checkGatewayReceiverStats(int processBatches,
int eventsReceived, int creates) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
- GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
.getAcceptor().getStats();
@@ -1709,7 +1172,7 @@ public class WANTestBase extends DistributedTestCase{
public static void checkMinimumGatewayReceiverStats(int processBatches,
int eventsReceived) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
- GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
.getAcceptor().getStats();
@@ -1721,7 +1184,7 @@ public class WANTestBase extends DistributedTestCase{
public static void checkExcepitonStats(int exceptionsOccured) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
- GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
.getAcceptor().getStats();
@@ -1739,7 +1202,7 @@ public class WANTestBase extends DistributedTestCase{
public static void checkGatewayReceiverStatsHA(int processBatches,
int eventsReceived, int creates) {
Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
- GatewayReceiver receiver = (GatewayReceiver)gatewayReceivers.iterator().next();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
.getAcceptor().getStats();
@@ -1776,21 +1239,6 @@ public class WANTestBase extends DistributedTestCase{
assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
}
- public static void checkAsyncEventQueueConflatedStats(
- String asyncEventQueueId, final int eventsConflated) {
- Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : queues) {
- if (q.getId().equals(asyncEventQueueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
- .getStatistics();
- assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
- }
-
public static void checkStats_Failover(String senderId,
final int eventsReceived) {
Set<GatewaySender> senders = cache.getGatewaySenders();
@@ -1810,25 +1258,6 @@ public class WANTestBase extends DistributedTestCase{
.getUnprocessedEventsRemovedByPrimary()));
}
- public static void checkAsyncEventQueueStats_Failover(String asyncEventQueueId,
- final int eventsReceived) {
- Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : asyncEventQueues) {
- if (q.getId().equals(asyncEventQueueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl) queue)
- .getStatistics();
-
- assertEquals(eventsReceived, statistics.getEventsReceived());
- assertEquals(eventsReceived, (statistics.getEventsQueued()
- + statistics.getUnprocessedTokensAddedByPrimary() + statistics
- .getUnprocessedEventsRemovedByPrimary()));
- }
-
public static void checkBatchStats(String senderId, final int batches) {
Set<GatewaySender> senders = cache.getGatewaySenders();
GatewaySender sender = null;
@@ -1844,22 +1273,6 @@ public class WANTestBase extends DistributedTestCase{
assertEquals(0, statistics.getBatchesRedistributed());
}
- public static void checkAsyncEventQueueBatchStats(String asyncQueueId,
- final int batches) {
- Set<AsyncEventQueue> queues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : queues) {
- if (q.getId().equals(asyncQueueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue)
- .getStatistics();
- assert (statistics.getBatchesDistributed() >= batches);
- assertEquals(0, statistics.getBatchesRedistributed());
- }
-
public static void checkBatchStats(String senderId,
final boolean batchesDistributed, final boolean bathcesRedistributed) {
Set<GatewaySender> senders = cache.getGatewaySenders();
@@ -1896,43 +1309,13 @@ public class WANTestBase extends DistributedTestCase{
.getUnprocessedTokensAddedByPrimary()));
}
- public static void checkAsyncEventQueueUnprocessedStats(String asyncQueueId, int events) {
- Set<AsyncEventQueue> asyncQueues = cache.getAsyncEventQueues();
- AsyncEventQueue queue = null;
- for (AsyncEventQueue q : asyncQueues) {
- if (q.getId().equals(asyncQueueId)) {
- queue = q;
- break;
- }
- }
- final AsyncEventQueueStats statistics = ((AsyncEventQueueImpl)queue).getStatistics();
- assertEquals(events,
- (statistics.getUnprocessedEventsAddedBySecondary() + statistics
- .getUnprocessedTokensRemovedBySecondary()));
- assertEquals(events,
- (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
- .getUnprocessedTokensAddedByPrimary()));
- }
-
public static void waitForSenderRunningState(String senderId){
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
try {
Set<GatewaySender> senders = cache.getGatewaySenders();
final GatewaySender sender = getGatewaySenderById(senders, senderId);
-
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender != null && sender.isRunning()) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected sender isRunning state to be true but is false";
- }
- };
- Wait.waitForCriterion(wc, 300000, 500, true);
+ Awaitility.await().atMost(300,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender isRunning state to "
+ + "be true but is false", true, (sender != null && sender.isRunning())));
} finally {
exln.remove();
}
@@ -1941,19 +1324,8 @@ public class WANTestBase extends DistributedTestCase{
public static void waitForSenderToBecomePrimary(String senderId){
Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
final GatewaySender sender = getGatewaySenderById(senders, senderId);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender != null && ((AbstractGatewaySender) sender).isPrimary()) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected sender primary state to be true but is false";
- }
- };
- Wait.waitForCriterion(wc, 10000, 1000, true);
+ Awaitility.await().atMost(10,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender primary state to "
+ + "be true but is false", true, (sender != null && ((AbstractGatewaySender)sender).isPrimary())));
}
private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
@@ -1980,22 +1352,13 @@ public class WANTestBase extends DistributedTestCase{
secondaryUpdatesMap.put("Update", listener1.updateList);
secondaryUpdatesMap.put("Destroy", listener1.destroyList);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- secondaryUpdatesMap.put("Create", listener1.createList);
- secondaryUpdatesMap.put("Update", listener1.updateList);
- secondaryUpdatesMap.put("Destroy", listener1.destroyList);
- if (secondaryUpdatesMap.equals(primaryUpdatesMap)) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap;
- }
- };
- Wait.waitForCriterion(wc, 300000, 500, true);
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ secondaryUpdatesMap.put("Create", listener1.createList);
+ secondaryUpdatesMap.put("Update", listener1.updateList);
+ secondaryUpdatesMap.put("Destroy", listener1.destroyList);
+ assertEquals("Expected seconadry map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap,
+ true,secondaryUpdatesMap.equals(primaryUpdatesMap));
+ });
}
public static HashMap checkQueue2(){
@@ -2021,35 +1384,12 @@ public class WANTestBase extends DistributedTestCase{
PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
HashMap listenerAttrs = new HashMap();
for (int i = 0; i < numBuckets; i++) {
- BucketRegion br = region.getBucketRegion(i);
- QueueListener listener = (QueueListener)br.getCacheListener();
- listenerAttrs.put("Create"+i, listener.createList);
- listenerAttrs.put("Update"+i, listener.updateList);
- listenerAttrs.put("Destroy"+i, listener.destroyList);
- }
- return listenerAttrs;
- }
-
- public static HashMap checkQueue_PR(String senderId){
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for(GatewaySender s : senders){
- if(s.getId().equals(senderId)){
- sender = s;
- break;
- }
- }
-
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
- .getQueues().toArray(new RegionQueue[1])[0];
-
- PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
- QueueListener listener = (QueueListener)region.getCacheListener();
-
- HashMap listenerAttrs = new HashMap();
- listenerAttrs.put("Create", listener.createList);
- listenerAttrs.put("Update", listener.updateList);
- listenerAttrs.put("Destroy", listener.destroyList);
+ BucketRegion br = region.getBucketRegion(i);
+ QueueListener listener = (QueueListener)br.getCacheListener();
+ listenerAttrs.put("Create"+i, listener.createList);
+ listenerAttrs.put("Update"+i, listener.updateList);
+ listenerAttrs.put("Destroy"+i, listener.destroyList);
+ }
return listenerAttrs;
}
@@ -2062,7 +1402,7 @@ public class WANTestBase extends DistributedTestCase{
break;
}
}
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
.getQueues().toArray(new RegionQueue[1])[0];
PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
@@ -2110,7 +1450,7 @@ public class WANTestBase extends DistributedTestCase{
break;
}
}
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
.getQueues().toArray(new RegionQueue[1])[0];
PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
@@ -2163,7 +1503,7 @@ public class WANTestBase extends DistributedTestCase{
}
}
else {
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
.getQueues().toArray(new RegionQueue[1])[0];
parallelQueue.addCacheListener(listener1);
}
@@ -2186,7 +1526,7 @@ public class WANTestBase extends DistributedTestCase{
}
}
else {
- RegionQueue parallelQueue = (RegionQueue)((AbstractGatewaySender)sender)
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
.getQueues().toArray(new RegionQueue[1])[0];
parallelQueue.addCacheListener(listener2);
}
@@ -2215,27 +1555,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void pauseSenderAndWaitForDispatcherToPause(String senderId) {
- final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
- IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
- .getName());
- try {
- Set<GatewaySender> senders = cache.getGatewaySenders();
- GatewaySender sender = null;
- for (GatewaySender s : senders) {
- if (s.getId().equals(senderId)) {
- sender = s;
- break;
- }
- }
- sender.pause();
- ((AbstractGatewaySender)sender).getEventProcessor().waitForDispatcherToPause();
- } finally {
- exp.remove();
- exln.remove();
- }
- }
-
public static void resumeSender(String senderId) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
@@ -2310,7 +1629,7 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName, int remoteDsId,
+ public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName,
boolean isParallel, Integer maxMemory,
Integer batchSize, boolean isConflation, boolean isPersistent,
GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) {
@@ -2323,7 +1642,7 @@ public class WANTestBase extends DistributedTestCase{
gateway.setManualStart(isManualStart);
gateway.setDispatcherThreads(numDispatchers);
gateway.setOrderPolicy(policy);
- ((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+ gateway.setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
eventFilter = filter;
gateway.addGatewayEventFilter(filter);
@@ -2350,7 +1669,7 @@ public class WANTestBase extends DistributedTestCase{
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] { persistentDirectory };
- GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
gateway.create(dsName, remoteDsId);
} finally {
@@ -2368,7 +1687,7 @@ public class WANTestBase extends DistributedTestCase{
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] { persistentDirectory };
- GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter,
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName,isParallel, maxMemory, batchSize, isConflation, isPersistent, filter,
isManualStart, numDispatchers, orderPolicy);
gateway.create(dsName, remoteDsId);
@@ -2377,10 +1696,8 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void createSenderWithoutDiskStore(String dsName, int remoteDsId,
- boolean isParallel, Integer maxMemory,
- Integer batchSize, boolean isConflation, boolean isPersistent,
- GatewayEventFilter filter, boolean isManulaStart) {
+ public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isManulaStart) {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setParallel(true);
@@ -2403,53 +1720,10 @@ public class WANTestBase extends DistributedTestCase{
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] { persistentDirectory };
- GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, remoteDsId, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
gateway.create(dsName, remoteDsId);
}
-// public static void createSender_PDX(String dsName, int remoteDsId,
-// boolean isParallel, Integer maxMemory,
-// Integer batchSize, boolean isConflation, boolean isPersistent,
-// GatewayEventFilter filter, boolean isManulaStart) {
-// File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
-// persistentDirectory.mkdir();
-//
-// File [] dirs1 = new File[] {persistentDirectory};
-//
-// if(isParallel) {
-// ParallelGatewaySenderFactory gateway = cache.createParallelGatewaySenderFactory();
-// gateway.setMaximumQueueMemory(maxMemory);
-// gateway.setBatchSize(batchSize);
-// ((ParallelGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
-// if (filter != null) {
-// gateway.addGatewayEventFilter(filter);
-// }
-// if(isPersistent) {
-// gateway.setPersistenceEnabled(true);
-// DiskStoreFactory dsf = cache.createDiskStoreFactory();
-// gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
-// }
-// gateway.setBatchConflationEnabled(isConflation);
-// gateway.create(dsName, remoteDsId);
-//
-// }else {
-// SerialGatewaySenderFactory gateway = cache.createSerialGatewaySenderFactory();
-// gateway.setMaximumQueueMemory(maxMemory);
-// gateway.setBatchSize(batchSize);
-// gateway.setManualStart(isManulaStart);
-// ((SerialGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
-// if (filter != null) {
-// gateway.addGatewayEventFilter(filter);
-// }
-// gateway.setBatchConflationEnabled(isConflation);
-// if(isPersistent) {
-// gateway.setPersistenceEnabled(true);
-// DiskStoreFactory dsf = cache.createDiskStoreFactory();
-// gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
-// }
-// gateway.create(dsName, remoteDsId);
-// }
-// }
public static void createSenderForValidations(String dsName, int remoteDsId,
boolean isParallel, Integer alertThreshold,
boolean isConflation, boolean isPersistent,
@@ -2527,8 +1801,7 @@ public class WANTestBase extends DistributedTestCase{
gateway.setDiskStoreName(store.getName());
}
gateway.setDiskSynchronous(isDiskSync);
- GatewaySender sender = gateway
- .create(dsName, remoteDsId);
+ gateway.create(dsName, remoteDsId);
}
}
finally {
@@ -2675,41 +1948,13 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void pauseWaitCriteria(final long millisec) {
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- return false;
- }
-
- public String description() {
- return "Expected to wait for " + millisec + " millisec.";
- }
- };
- Wait.waitForCriterion(wc, millisec, 500, false);
- }
-
- public static void createReceiverInVMs(int locatorPort, VM... vms) {
- for (VM vm : vms) {
- vm.invoke(() -> createReceiver(locatorPort));
- }
- }
-
- public static void createReceiverInVMsAsync(int locatorPort, VM... vms) {
- List<AsyncInvocation> tasks = new LinkedList<>();
+ public static void createReceiverInVMs(VM... vms) {
for (VM vm : vms) {
- tasks.add(vm.invokeAsync(() -> createReceiver(locatorPort)));
- }
- for (AsyncInvocation invocation : tasks) {
- try {
- invocation.join(30000);
- }
- catch (InterruptedException e) {
- fail("Failed starting up the receiver");
- }
+ vm.invoke(() -> createReceiver());
}
}
- public static int createReceiver(int locPort) {
+ public static int createReceiver() {
GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
fact.setStartPort(port);
@@ -2805,15 +2050,6 @@ public class WANTestBase extends DistributedTestCase{
return port;
}
- public static String makePath(String[] strings) {
- StringBuilder sb = new StringBuilder();
- for(int i=0;i<strings.length;i++){
- sb.append(strings[i]);
- sb.append(File.separator);
- }
- return sb.toString();
- }
-
public static void createReceiverAndServer(int locPort) {
WANTestBase test = new WANTestBase(getTestMethodName());
Properties props = test.getDistributedSystemProperties();
@@ -2841,7 +2077,6 @@ public class WANTestBase extends DistributedTestCase{
int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(serverPort);
server.setHostnameForClients("localhost");
- //server.setGroups(new String[]{"serv"});
try {
server.start();
} catch (IOException e) {
@@ -2849,23 +2084,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static int createReceiverInSecuredCache(int locPort) {
- GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
- int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
- fact.setStartPort(port);
- fact.setEndPort(port);
- fact.setManualStart(true);
- GatewayReceiver receiver = fact.create();
- try {
- receiver.start();
- }
- catch (IOException e) {
- e.printStackTrace();
- com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start GatewayRecevier on port " + port, e);
- }
- return port;
- }
-
public static int createServer(int locPort) {
WANTestBase test = new WANTestBase(getTestMethodName());
Properties props = test.getDistributedSystemProperties();
@@ -2879,7 +2097,6 @@ public class WANTestBase extends DistributedTestCase{
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(port);
server.setHostnameForClients("localhost");
- //server.setGroups(new String[]{"serv"});
try {
server.start();
} catch (IOException e) {
@@ -2902,7 +2119,7 @@ public class WANTestBase extends DistributedTestCase{
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
try {
- p = PoolManager.createFactory().addLocator(host, port0) //.setServerGroup("serv")
+ p = PoolManager.createFactory().addLocator(host, port0)
.setPingInterval(250).setSubscriptionEnabled(true)
.setSubscriptionRedundancy(-1).setReadTimeout(2000)
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10)
@@ -2975,9 +2192,6 @@ public class WANTestBase extends DistributedTestCase{
exp1.remove();
exp2.remove();
}
-// for (long i = 0; i < numPuts; i++) {
-// r.destroy(i);
-// }
}
@@ -3013,20 +2227,6 @@ public class WANTestBase extends DistributedTestCase{
exp1.remove();
exp2.remove();
}
-// for (long i = 0; i < numPuts; i++) {
-// r.destroy(i);
-// }
- }
-
- /**
- * To be used for CacheLoader related tests
- */
- public static void doGets(String regionName, int numGets) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (long i = 0; i < numGets; i++) {
- r.get(i);
- }
}
public static void doPutsAfter300(String regionName, int numPuts) {
@@ -3090,40 +2290,8 @@ public class WANTestBase extends DistributedTestCase{
public static void destroyRegion(String regionName, final int min) {
final Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.size() > min) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Looking for min size of region to be " + min;
- }
- };
- Wait.waitForCriterion(wc, 30000, 5, false);
- r.destroyRegion();
- }
-
- public static void destroyRegionAfterMinRegionSize(String regionName, final int min) {
- final Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (destroyFlag) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Looking for min size of region to be " + min;
- }
- };
- Wait.waitForCriterion(wc, 30000, 5, false);
+ Awaitility.await().atMost(30,TimeUnit.SECONDS).until(() -> r.size() > min);
r.destroyRegion();
- destroyFlag = false;
}
public static void localDestroyRegion(String regionName) {
@@ -3228,24 +2396,22 @@ public class WANTestBase extends DistributedTestCase{
Map orderKeyValues = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- Order order = new Order("OREDR" + oid + "_update");
- try {
- orderRegion.put(orderId, order);
- orderKeyValues.put(orderId, order);
- assertTrue(orderRegion.containsKey(orderId));
- assertEquals(order,orderRegion.get(orderId));
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ Order order = new Order("OREDR" + oid + "_update");
+ try {
+ orderRegion.put(orderId, order);
+ orderKeyValues.put(orderId, order);
+ assertTrue(orderRegion.containsKey(orderId));
+ assertEquals(order,orderRegion.get(orderId));
- }
- catch (Exception e) {
- com.gemstone.gemfire.test.dunit.Assert.fail(
- "updateOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
- e);
- }
- LogWriterUtils.getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
}
+ catch (Exception e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail(
+ "updateOrderPartitionedRegion : failed while doing put operation in OrderPartitionedRegion ",
+ e);
+ }
+ LogWriterUtils.getLogWriter().info("Order :- { " + orderId + " : " + order + " }");
}
return orderKeyValues;
}
@@ -3278,28 +2444,24 @@ public class WANTestBase extends DistributedTestCase{
Map shipmentKeyValue = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- for (int k = 1; k <= 1; k++) {
- int sid = (oid * 1) + k;
- ShipmentId shipmentId = new ShipmentId(sid, orderId);
- Shipment shipment = new Shipment("Shipment" + sid);
- try {
- shipmentRegion.put(shipmentId, shipment);
- assertTrue(shipmentRegion.containsKey(shipmentId));
- assertEquals(shipment,shipmentRegion.get(shipmentId));
- shipmentKeyValue.put(shipmentId, shipment);
- }
- catch (Exception e) {
- com.gemstone.gemfire.test.dunit.Assert.fail(
- "putShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
- e);
- }
- LogWriterUtils.getLogWriter().info(
- "Shipment :- { " + shipmentId + " : " + shipment + " }");
- }
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ int sid = oid + 1;
+ ShipmentId shipmentId = new ShipmentId(sid, orderId);
+ Shipment shipment = new Shipment("Shipment" + sid);
+ try {
+ shipmentRegion.put(shipmentId, shipment);
+ assertTrue(shipmentRegion.containsKey(shipmentId));
+ assertEquals(shipment,shipmentRegion.get(shipmentId));
+ shipmentKeyValue.put(shipmentId, shipment);
}
+ catch (Exception e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail(
+ "putShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
+ e);
+ }
+ LogWriterUtils.getLogWriter().info(
+ "Shipment :- { " + shipmentId + " : " + shipment + " }");
}
return shipmentKeyValue;
}
@@ -3313,18 +2475,14 @@ public class WANTestBase extends DistributedTestCase{
CustId custid = new CustId(i);
Customer customer = new Customer("Customer" + custid, "Address" + custid);
customerRegion.put(custid, customer);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- Order order = new Order("Order"+orderId);
- orderRegion.put(orderId, order);
- for (int k = 1; k <= 1; k++) {
- int sid = (oid * 1) + k;
- ShipmentId shipmentId = new ShipmentId(sid, orderId);
- Shipment shipment = new Shipment("Shipment" + sid);
- shipmentRegion.put(shipmentId, shipment);
- }
- }
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ Order order = new Order("Order"+orderId);
+ orderRegion.put(orderId, order);
+ int sid = oid + 1;
+ ShipmentId shipmentId = new ShipmentId(sid, orderId);
+ Shipment shipment = new Shipment("Shipment" + sid);
+ shipmentRegion.put(shipmentId, shipment);
}
}
@@ -3356,28 +2514,24 @@ public class WANTestBase extends DistributedTestCase{
Map shipmentKeyValue = new HashMap();
for (int i = 1; i <= numPuts; i++) {
CustId custid = new CustId(i);
- for (int j = 1; j <= 1; j++) {
- int oid = (i * 1) + j;
- OrderId orderId = new OrderId(oid, custid);
- for (int k = 1; k <= 1; k++) {
- int sid = (oid * 1) + k;
- ShipmentId shipmentId = new ShipmentId(sid, orderId);
- Shipment shipment = new Shipment("Shipment" + sid + "_update");
- try {
- shipmentRegion.put(shipmentId, shipment);
- assertTrue(shipmentRegion.containsKey(shipmentId));
- assertEquals(shipment,shipmentRegion.get(shipmentId));
- shipmentKeyValue.put(shipmentId, shipment);
- }
- catch (Exception e) {
- com.gemstone.gemfire.test.dunit.Assert.fail(
- "updateShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
- e);
- }
- LogWriterUtils.getLogWriter().info(
- "Shipment :- { " + shipmentId + " : " + shipment + " }");
- }
+ int oid = i + 1;
+ OrderId orderId = new OrderId(oid, custid);
+ int sid = oid + 1;
+ ShipmentId shipmentId = new ShipmentId(sid, orderId);
+ Shipment shipment = new Shipment("Shipment" + sid + "_update");
+ try {
+ shipmentRegion.put(shipmentId, shipment);
+ assertTrue(shipmentRegion.containsKey(shipmentId));
+ assertEquals(shipment,shipmentRegion.get(shipmentId));
+ shipmentKeyValue.put(shipmentId, shipment);
+ }
+ catch (Exception e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail(
+ "updateShipmentPartitionedRegion : failed while doing put operation in ShipmentPartitionedRegion ",
+ e);
}
+ LogWriterUtils.getLogWriter().info(
+ "Shipment :- { " + shipmentId + " : " + shipment + " }");
}
return shipmentKeyValue;
}
@@ -3421,7 +2575,7 @@ public class WANTestBase extends DistributedTestCase{
}
- public static void doTxPuts(String regionName, int numPuts) {
+ public static void doTxPuts(String regionName) {
Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
CacheTransactionManager mgr = cache.getCacheTransactionManager();
@@ -3434,7 +2588,6 @@ public class WANTestBase extends DistributedTestCase{
}
public static void doNextPuts(String regionName, int start, int numPuts) {
- //waitForSitesToUpdate();
IgnoredException exp = IgnoredException.addIgnoredException(CacheClosedException.class
.getName());
try {
@@ -3494,30 +2647,16 @@ public class WANTestBase extends DistributedTestCase{
}
if (sender.isParallel()) {
- int totalSize = 0;
final Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
-
- WaitCriterion wc = new WaitCriterion() {
+ Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> {
int size = 0;
- public boolean done() {
- for (RegionQueue q : queues) {
- ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue)q;
- size += prQ.localSize();
- }
- if (size == numQueueEntries) {
- return true;
- }
- return false;
+ for (RegionQueue q : queues) {
+ ConcurrentParallelGatewaySenderQueue prQ = (ConcurrentParallelGatewaySenderQueue)q;
+ size += prQ.localSize();
}
-
- public String description() {
- return " Expected local queue entries: " + numQueueEntries
- + " but actual entries: " + size;
- }
-
- };
-
- Wait.waitForCriterion(wc, 120000, 500, true);
+ assertEquals(" Expected local queue entries: " + numQueueEntries
+ + " but actual entries: " + size, numQueueEntries, size);
+ });
}
}
@@ -3545,38 +2684,6 @@ public class WANTestBase extends DistributedTestCase{
return -1;
}
- public static void doUpdates(String regionName, int numUpdates) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (int i = 0; i < numUpdates; i++) {
- String s = "K"+i;
- r.put(i, s);
- }
- }
-
- public static void doUpdateOnSameKey(String regionName, int key,
- int numUpdates) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (int i = 0; i < numUpdates; i++) {
- String s = "V_" + i;
- r.put(key, s);
- }
- }
-
- public static void doRandomUpdates(String regionName, int numUpdates) {
- Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- Set<Integer> generatedKeys = new HashSet<Integer>();
- while(generatedKeys.size() != numUpdates) {
- generatedKeys.add((new Random()).nextInt(r.size()));
- }
- for (Integer i: generatedKeys) {
- String s = "K"+i;
- r.put(i, s);
- }
- }
-
public static void doMultiThreadedPuts(String regionName, int numPuts) {
final AtomicInteger ai = new AtomicInteger(-1);
final ExecutorService execService = Executors.newFixedThreadPool(5,
@@ -3614,68 +2721,26 @@ public class WANTestBase extends DistributedTestCase{
validateRegionSize(regionName, regionSize, 30000);
}
- public static void validateRegionSize(String regionName, final int regionSize, long waitTime) {
+ public static void validateRegionSize(String regionName, final int regionSize, long waitTimeInMilliSec) {
IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
.getName());
IgnoredException exp1 = IgnoredException.addIgnoredException(CacheClosedException.class
.getName());
try {
-
final Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.keySet().size() == regionSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected region entries: " + regionSize
- + " but actual entries: " + r.keySet().size()
- + " present region keyset " + r.keySet();
- }
- };
- Wait.waitForCriterion(wc, waitTime, 500, true);
+ if ( regionSize != r.keySet().size()) {
+ Awaitility.await().atMost(waitTimeInMilliSec, TimeUnit.MILLISECONDS).pollInterval(500, TimeUnit.MILLISECONDS)
+ .until(() ->
+ assertEquals("Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size()
+ + " present region keyset " + r.keySet(), regionSize, r.keySet().size()));
+ }
} finally {
exp.remove();
exp1.remove();
}
}
- /**
- * Validate whether all the attributes set on AsyncEventQueueFactory are set
- * on the sender underneath the AsyncEventQueue.
- */
- public static void validateAsyncEventQueueAttributes(String asyncChannelId,
- int maxQueueMemory, int batchSize, int batchTimeInterval,
- boolean isPersistent, String diskStoreName, boolean isDiskSynchronous,
- boolean batchConflationEnabled) {
-
- AsyncEventQueue theChannel = null;
-
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncChannelId.equals(asyncChannel.getId())) {
- theChannel = asyncChannel;
- }
- }
-
- GatewaySender theSender = ((AsyncEventQueueImpl)theChannel).getSender();
- assertEquals("maxQueueMemory", maxQueueMemory, theSender
- .getMaximumQueueMemory());
- assertEquals("batchSize", batchSize, theSender.getBatchSize());
- assertEquals("batchTimeInterval", batchTimeInterval, theSender
- .getBatchTimeInterval());
- assertEquals("isPersistent", isPersistent, theSender.isPersistenceEnabled());
- assertEquals("diskStoreName", diskStoreName, theSender.getDiskStoreName());
- assertEquals("isDiskSynchronous", isDiskSynchronous, theSender
- .isDiskSynchronous());
- assertEquals("batchConflation", batchConflationEnabled, theSender
- .isBatchConflationEnabled());
- }
-
public static void validateAsyncEventListener(String asyncQueueId, final int expectedSize) {
AsyncEventListener theListener = null;
@@ -3688,153 +2753,43 @@ public class WANTestBase extends DistributedTestCase{
final Map eventsMap = ((MyAsyncEventListener) theListener).getEventsMap();
assertNotNull(eventsMap);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (eventsMap.size() == expectedSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected map entries: " + expectedSize
- + " but actual entries: " + eventsMap.size();
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true); //TODO:Yogs
- }
-
- public static void validateCustomAsyncEventListener(String asyncQueueId,
- final int expectedSize) {
- AsyncEventListener theListener = null;
-
- Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncQueue : asyncEventQueues) {
- if (asyncQueueId.equals(asyncQueue.getId())) {
- theListener = asyncQueue.getAsyncEventListener();
- }
- }
-
- final Map eventsMap = ((CustomAsyncEventListener) theListener).getEventsMap();
- assertNotNull(eventsMap);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (eventsMap.size() == expectedSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected map entries: " + expectedSize
- + " but actual entries: " + eventsMap.size();
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true); // TODO:Yogs
-
- Iterator<AsyncEvent> itr = eventsMap.values().iterator();
- while (itr.hasNext()) {
- AsyncEvent event = itr.next();
- assertTrue("possibleDuplicate should be true for event: " + event, event.getPossibleDuplicate());
- }
+ Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> assertEquals("Expected map entries: " + expectedSize
+ + " but actual entries: " + eventsMap.size(), expectedSize, eventsMap.size()));
}
public static void waitForAsyncQueueToGetEmpty(String asyncQueueId) {
AsyncEventQueue theAsyncEventQueue = null;
- Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncChannel : asyncEventChannels) {
- if (asyncQueueId.equals(asyncChannel.getId())) {
- theAsyncEventQueue = asyncChannel;
- }
- }
-
- final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
- .getSender();
-
- if (sender.isParallel()) {
- final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
- .getQueues();
-
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- if (size == 0) {
- return true;
- }
- return false;
- }
-
- public String description() {
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- return "Expected queue size to be : " + 0 + " but actual entries: "
- + size;
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true);
-
- } else {
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
- .getQueues();
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- if (size == 0) {
- return true;
- }
- return false;
- }
-
- public String description() {
- Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
- .getQueues();
- int size = 0;
- for (RegionQueue q : queues) {
- size += q.size();
- }
- return "Expected queue size to be : " + 0 + " but actual entries: "
- + size;
- }
- };
- Wait.waitForCriterion(wc, 60000, 500, true);
- }
- }
-
- public static void verifyAsyncEventListenerForPossibleDuplicates(
- String asyncEventQueueId, Set<Integer> bucketIds, int batchSize) {
- AsyncEventListener theListener = null;
-
- Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncQueue : asyncEventQueues) {
- if (asyncEventQueueId.equals(asyncQueue.getId())) {
- theListener = asyncQueue.getAsyncEventListener();
+ Set<AsyncEventQueue> asyncEventChannels = cache.getAsyncEventQueues();
+ for (AsyncEventQueue asyncChannel : asyncEventChannels) {
+ if (asyncQueueId.equals(asyncChannel.getId())) {
+ theAsyncEventQueue = asyncChannel;
}
}
- final Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap = ((MyAsyncEventListener2)theListener)
- .getBucketToEventsMap();
- assertNotNull(bucketToEventsMap);
- assertTrue(bucketIds.size() > 1);
+ final GatewaySender sender = ((AsyncEventQueueImpl)theAsyncEventQueue)
+ .getSender();
- for (int bucketId : bucketIds) {
- List<GatewaySenderEventImpl> eventsForBucket = bucketToEventsMap
- .get(bucketId);
- LogWriterUtils.getLogWriter().info(
- "Events for bucket: " + bucketId + " is " + eventsForBucket);
- assertNotNull(eventsForBucket);
- for (int i = 0; i < batchSize; i++) {
- GatewaySenderEventImpl senderEvent = eventsForBucket.get(i);
- assertTrue(senderEvent.getPossibleDuplicate());
- }
+ if (sender.isParallel()) {
+ final Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+ .getQueues();
+ Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> {
+ int size = 0;
+ for (RegionQueue q : queues) {
+ size += q.size();
+ }
+ assertEquals("Expected queue size to be : " + 0 + " but actual entries: " + size, 0, size);
+ });
+ } else {
+ Awaitility.await().atMost(60,TimeUnit.SECONDS).until(() -> {
+ Set<RegionQueue> queues = ((AbstractGatewaySender)sender)
+ .getQueues();
+ int size = 0;
+ for (RegionQueue q : queues) {
+ size += q.size();
+ }
+ assertEquals("Expected queue size to be : " + 0 + " but actual entries: " + size, 0, size);
+ });
}
}
@@ -3854,81 +2809,21 @@ public class WANTestBase extends DistributedTestCase{
return eventsMap.size();
}
- public static int getAsyncEventQueueSize(String asyncEventQueueId) {
- AsyncEventQueue theQueue = null;
-
- Set<AsyncEventQueue> asyncEventQueues = cache.getAsyncEventQueues();
- for (AsyncEventQueue asyncQueue : asyncEventQueues) {
- if (asyncEventQueueId.equals(asyncQueue.getId())) {
- theQueue = asyncQueue;
- }
- }
- assertNotNull(theQueue);
- return theQueue.size();
- }
-
-
public static void validateRegionSize_PDX(String regionName, final int regionSize) {
final Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.keySet().size() >= regionSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
-
- return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet() ;
- }
- };
- Wait.waitForCriterion(wc, 200000, 500, true);
+ Awaitility.await().atMost(200,TimeUnit.SECONDS).until(() -> assertEquals("Expected region entries: " + regionSize +
+ " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet(),
+ true,(regionSize <= r.keySet().size())));
for(int i = 0 ; i < regionSize; i++){
LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i));
assertEquals(new SimpleClass(i, (byte)i), r.get("Key_" + i));
}
}
- public static void validateRegionSize_PDX2(String regionName, final int regionSize) {
- final Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.keySet().size() == regionSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
-
- return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size() + " present region keyset " + r.keySet() ;
- }
- };
- Wait.waitForCriterion(wc, 200000, 500, true);
- for(int i = 0 ; i < regionSize; i++){
- LogWriterUtils.getLogWriter().info("For Key : Key_"+i + " : Values : " + r.get("Key_" + i));
- assertEquals(new SimpleClass1(false, (short) i, "" + i, i,"" +i ,""+ i,i, i), r.get("Key_" + i));
- }
- }
public static void validateQueueSizeStat(String id, final int queueSize) {
final AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(id);
-
- Wait.waitForCriterion(new WaitCriterion() {
-
- @Override
- public boolean done() {
- return sender.getEventQueueSize() == queueSize;
- }
-
- @Override
- public String description() {
- // TODO Auto-generated method stub
- return null;
- }
- }, 30000, 50, false);
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> assertEquals(queueSize, sender.getEventQueueSize()));
assertEquals(queueSize, sender.getEventQueueSize());
}
/**
@@ -3987,97 +2882,21 @@ public class WANTestBase extends DistributedTestCase{
public static void validateRegionContents(String regionName, final Map keyValues) {
final Region r = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- for(Object key: keyValues.keySet()) {
- if (!r.get(key).equals(keyValues.get(key))) {
- LogWriterUtils.getLogWriter().info(
- "The values are for key " + " " + key + " " + r.get(key)
- + " in the map " + keyValues.get(key));
- return false;
- }
- }
- return true;
- }
-
- public String description() {
- return "Expected region entries doesn't match";
- }
- };
- Wait.waitForCriterion(wc, 120000, 500, true);
- }
-
- public static void CheckContent(String regionName, final int regionSize) {
- final Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- for (long i = 0; i < regionSize; i++) {
- assertEquals(i, r.get(i));
- }
- }
-
- public static void validateRegionContentsForPR(String regionName,
- final int regionSize) {
- final Region r = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(r);
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (r.keySet().size() == regionSize) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Expected region entries: " + regionSize + " but actual entries: " + r.keySet().size();
- }
- };
- Wait.waitForCriterion(wc, 120000, 500, true);
- }
-
- public static void verifyPrimaryStatus(final Boolean isPrimary) {
- final Set<GatewaySender> senders = cache.getGatewaySenders();
- assertEquals(senders.size(), 1);
- final AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
-
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender.isPrimary() == isPrimary.booleanValue()) {
- return true;
+ Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> {
+ boolean matchFlag = true;
+ for(Object key: keyValues.keySet()) {
+ if (!r.get(key).equals(keyValues.get(key))) {
+ LogWriterUtils.getLogWriter().info(
+ "The values are for key " + " " + key + " " + r.get(key)
+ + " in the map " + keyValues.get(key));
+ matchFlag = false;
}
- return false;
}
-
- public String description() {
- return "Expected sender to be : " + isPrimary.booleanValue() + " but actually it is : " + sender.isPrimary();
- }
- };
- Wait.waitForCriterion(wc, 120000, 500, true);
+ assertEquals("Expected region entries doesn't match", true, matchFlag);
+ });
}
- public static Boolean getPrimaryStatus(){
- Set<GatewaySender> senders = cache.getGatewaySenders();
- assertEquals(senders.size(), 1);
- final AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
- WaitCriterion wc = new WaitCriterion() {
- public boolean done() {
- if (sender.isPrimary()) {
- return true;
- }
- return false;
- }
-
- public String description() {
- return "Checking Primary Status";
- }
- };
- Wait.waitForCriterion(wc, 10000, 500, false);
- return sender.isPrimary();
- }
- public static Set<Integer> getAllPrimaryBucketsOnTheNode(String regionName) {
- PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
- return region.getDataStore().getAllLocalPrimaryBucketIds();
- }
public static void doHeavyPuts(String regionName, int numPuts) {
Region r = cache.getRegion(Region.SEPARATOR + regionName);
@@ -4089,24 +2908,6 @@ public class WANTestBase extends DistributedTestCase{
}
}
- public static void addListenerAndKillPrimary(){
- Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
- assertEquals(senders.size(), 1);
- AbstractGatewaySender sender = (AbstractGatewaySender)senders.iterator().next();
- Region queue = cache.getRegion(Region.SEPARATOR+sender.getId()+"_SERIAL_GATEWAY_SENDER_QUEUE");
- assertNotNull(queue);
- CacheListenerAdapter cl = new CacheListenerAdapter() {
- public void afterCreate(EntryEvent event) {
- if((Long)event.getKey() > 900){
- cache.getLogger().fine(" Gateway sender is killed by a test");
- cache.close();
- cache.getDistributedSystem().disconnect();
- }
- }
- };
- queue.getAttributesMutator().addCacheListener(cl);
- }
-
public static void addCacheListenerAndDestroyRegion(String regionName){
final Region region = cache.getRegion(Region.SEPARATOR + regionName);
assertNotNull(region);
@@ -4121,22 +2922,6 @@ public class WANTestBase extends DistributedTestCase{
region.getAttributesMutator().addCacheListener(cl);
}
- public static void addCacheListenerAndCloseCache(String regionName){
- final Region region = cache.getRegion(Region.SEPARATOR + regionName);
- assertNotNull(region);
- CacheListenerAdapter cl = new CacheListenerAdapter() {
- @Override
- public void afterCreate(EntryEvent event) {
- if((Long)event.getKey() == 900){
- cache.getLogger().fine(" Gateway sender is killed by a test");
- cache.close();
- cache.getDistributedSystem().disconnect();
-
<TRUNCATED>