You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by hi...@apache.org on 2016/09/13 22:44:07 UTC
[19/61] [abbrv] incubator-geode git commit: GEODE-37 change package
name from com.gemstone.gemfire (for
./geode-wan/src/test/java/com/gemstone/gemfire)to org.apache.geode for(to
./geode-wan/src/test/java/org/apache/geode)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f39e2394/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
new file mode 100644
index 0000000..6727f1b
--- /dev/null
+++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -0,0 +1,3765 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.wan;
+
+import static com.gemstone.gemfire.distributed.ConfigurationProperties.*;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.jayway.awaitility.Awaitility;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.AttributesMutator;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.CacheListener;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
+import com.gemstone.gemfire.cache.client.internal.locator.wan.LocatorMembershipListener;
+import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.distributed.Locator;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.InternalLocator;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.FileUtil;
+import com.gemstone.gemfire.internal.admin.remote.DistributionLocatorId;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.CacheConfig;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.CustomerIDPartitionResolver;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.execute.data.CustId;
+import com.gemstone.gemfire.internal.cache.execute.data.Customer;
+import com.gemstone.gemfire.internal.cache.execute.data.Order;
+import com.gemstone.gemfire.internal.cache.execute.data.OrderId;
+import com.gemstone.gemfire.internal.cache.execute.data.Shipment;
+import com.gemstone.gemfire.internal.cache.execute.data.ShipmentId;
+import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+import com.gemstone.gemfire.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import com.gemstone.gemfire.internal.cache.wan.serial.SerialGatewaySenderQueue;
+import com.gemstone.gemfire.pdx.SimpleClass;
+import com.gemstone.gemfire.pdx.SimpleClass1;
+import com.gemstone.gemfire.test.dunit.Assert;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.IgnoredException;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+import com.gemstone.gemfire.test.dunit.internal.JUnit4DistributedTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+import com.gemstone.gemfire.util.test.TestUtil;
+
+@Category(DistributedTest.class)
+public class WANTestBase extends JUnit4DistributedTestCase {
+
+ protected static Cache cache;
+ protected static Region region;
+
+ protected static PartitionedRegion customerRegion;
+ protected static PartitionedRegion orderRegion;
+ protected static PartitionedRegion shipmentRegion;
+
+ protected static final String customerRegionName = "CUSTOMER";
+ protected static final String orderRegionName = "ORDER";
+ protected static final String shipmentRegionName = "SHIPMENT";
+
+ protected static VM vm0;
+ protected static VM vm1;
+ protected static VM vm2;
+ protected static VM vm3;
+ protected static VM vm4;
+ protected static VM vm5;
+ protected static VM vm6;
+ protected static VM vm7;
+
+ protected static QueueListener listener1;
+ protected static QueueListener listener2;
+
+ protected static AsyncEventListener eventListener1 ;
+ protected static AsyncEventListener eventListener2 ;
+
+ private static final long MAX_WAIT = 10000;
+
+ protected static GatewayEventFilter eventFilter;
+
+ protected static List<Integer> dispatcherThreads =
+ new ArrayList<Integer>(Arrays.asList(1, 3, 5));
+ //this will be set for each test method run with one of the values from above list
+ protected static int numDispatcherThreadsForTheRun = 1;
+
+ public WANTestBase() {
+ super();
+ }
+
+ /**
+ * @deprecated Use the no arg constructor, or better yet, don't construct this class
+ */
+ @Deprecated
+ public WANTestBase(final String ignored) {
+ }
+
+ @Override
+ public final void preSetUp() throws Exception {
+ final Host host = Host.getHost(0);
+ vm0 = host.getVM(0);
+ vm1 = host.getVM(1);
+ vm2 = host.getVM(2);
+ vm3 = host.getVM(3);
+ vm4 = host.getVM(4);
+ vm5 = host.getVM(5);
+ vm6 = host.getVM(6);
+ vm7 = host.getVM(7);
+ //Need to set the test name after the VMs are created
+ }
+
+ @Override
+ public final void postSetUp() throws Exception {
+ //this is done to vary the number of dispatchers for sender
+ //during every test method run
+ shuffleNumDispatcherThreads();
+ Invoke.invokeInEveryVM(() -> setNumDispatcherThreadsForTheRun(dispatcherThreads.get(0)));
+ IgnoredException.addIgnoredException("Connection refused");
+ IgnoredException.addIgnoredException("Software caused connection abort");
+ IgnoredException.addIgnoredException("Connection reset");
+ postSetUpWANTestBase();
+ }
+
+ protected void postSetUpWANTestBase() throws Exception {
+ }
+
+ public static void shuffleNumDispatcherThreads() {
+ Collections.shuffle(dispatcherThreads);
+ }
+
+ public static void setNumDispatcherThreadsForTheRun(int numThreads) {
+ numDispatcherThreadsForTheRun = numThreads;
+ }
+
+ public static void stopOldLocator() {
+ if (Locator.hasLocator()) {
+ Locator.getLocator().stop();
+ }
+ }
+
+ public static void createLocator(int dsId, int port, Set<String> localLocatorsList, Set<String> remoteLocatorsList){
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ StringBuffer localLocatorBuffer = new StringBuffer(localLocatorsList.toString());
+ localLocatorBuffer.deleteCharAt(0);
+ localLocatorBuffer.deleteCharAt(localLocatorBuffer.lastIndexOf("]"));
+ String localLocator = localLocatorBuffer.toString();
+ localLocator = localLocator.replace(" ", "");
+
+ props.setProperty(LOCATORS, localLocator);
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+ StringBuffer remoteLocatorBuffer = new StringBuffer(remoteLocatorsList.toString());
+ remoteLocatorBuffer.deleteCharAt(0);
+ remoteLocatorBuffer.deleteCharAt(remoteLocatorBuffer.lastIndexOf("]"));
+ String remoteLocator = remoteLocatorBuffer.toString();
+ remoteLocator = remoteLocator.replace(" ", "");
+ props.setProperty(REMOTE_LOCATORS, remoteLocator);
+ test.getSystem(props);
+ }
+
+ public static Integer createFirstLocatorWithDSId(int dsId) {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + port + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static Integer createFirstPeerLocator(int dsId) {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + port + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static Integer createSecondLocator(int dsId, int locatorPort) {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static Integer createSecondPeerLocator(int dsId, int locatorPort) {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + port + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static void bringBackLocatorOnOldPort(int dsId, int remoteLocPort, int oldPort) {
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.put(LOG_LEVEL, "fine");
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + oldPort + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + oldPort + "],server=true,peer=true,hostname-for-clients=localhost");
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+ test.getSystem(props);
+ }
+
+
+ public static Integer createFirstRemotePeerLocator(int dsId, int remoteLocPort) {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + port + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static Integer createSecondRemoteLocator(int dsId, int localPort,
+ int remoteLocPort) {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + localPort + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static Integer createSecondRemoteLocatorWithAPI(int dsId, int localPort,
+ int remoteLocPort, String hostnameForClients)
+ throws IOException
+ {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + localPort + "]");
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+ Locator locator = Locator.startLocatorAndDS(0, null, InetAddress.getByName("localhost"), props, true, true, hostnameForClients);
+ return locator.getPort();
+ }
+
+ public static Integer createSecondRemotePeerLocator(int dsId, int localPort,
+ int remoteLocPort) {
+ stopOldLocator();
+ WANTestBase test = new WANTestBase();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(DISTRIBUTED_SYSTEM_ID, ""+dsId);
+ props.setProperty(LOCATORS, "localhost[" + localPort + "]");
+ props.setProperty(START_LOCATOR, "localhost[" + port + "],server=false,peer=true,hostname-for-clients=localhost");
+ props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocPort + "]");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static int createReceiverInSecuredCache() {
+ GatewayReceiverFactory fact = WANTestBase.cache.createGatewayReceiverFactory();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ fact.setStartPort(port);
+ fact.setEndPort(port);
+ fact.setManualStart(true);
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start GatewayReceiver on port " + port, e);
+ }
+ return port;
+ }
+
+ public static void createReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class
+ .getName());
+ IgnoredException exp2 = IgnoredException.addIgnoredException(GatewaySenderException.class
+ .getName());
+ try {
+ AttributesFactory fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ fact.setDataPolicy(DataPolicy.REPLICATE);
+ fact.setScope(Scope.DISTRIBUTED_ACK);
+ fact.setOffHeap(offHeap);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ } finally {
+ exp.remove();
+ exp1.remove();
+ exp2.remove();
+ }
+ }
+
+ public static void createNormalRegion(String regionName, String senderIds){
+ AttributesFactory fact = new AttributesFactory();
+ if(senderIds!= null){
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()){
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ fact.setDataPolicy(DataPolicy.NORMAL);
+ fact.setScope(Scope.DISTRIBUTED_ACK);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ }
+
+ public static void createPersistentReplicatedRegion(String regionName, String senderIds, Boolean offHeap){
+ AttributesFactory fact = new AttributesFactory();
+ if(senderIds!= null){
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()){
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ fact.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
+ fact.setOffHeap(offHeap);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ }
+
+ public static void createReplicatedRegionWithAsyncEventQueue(
+ String regionName, String asyncQueueIds, Boolean offHeap) {
+ IgnoredException exp1 = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ try {
+ AttributesFactory fact = new AttributesFactory();
+ if (asyncQueueIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(asyncQueueIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String asyncQueueId = tokenizer.nextToken();
+ fact.addAsyncEventQueueId(asyncQueueId);
+ }
+ }
+ fact.setDataPolicy(DataPolicy.REPLICATE);
+ fact.setOffHeap(offHeap);
+ RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+ Region r = regionFactory.create(regionName);
+ assertNotNull(r);
+ } finally {
+ exp1.remove();
+ }
+ }
+
+ public static void createReplicatedRegionWithSenderAndAsyncEventQueue(
+ String regionName, String senderIds, String asyncChannelId, Boolean offHeap) {
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ try {
+ AttributesFactory fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ fact.setDataPolicy(DataPolicy.REPLICATE);
+ fact.setScope(Scope.DISTRIBUTED_ACK);
+ fact.setOffHeap(offHeap);
+ RegionFactory regionFactory = cache.createRegionFactory(fact.create());
+ regionFactory.addAsyncEventQueueId(asyncChannelId);
+ Region r = regionFactory.create(regionName);
+ assertNotNull(r);
+ } finally {
+ exp.remove();
+ }
+ }
+
+ public static void createReplicatedRegion(String regionName, String senderIds, Scope scope, DataPolicy policy, Boolean offHeap){
+ AttributesFactory fact = new AttributesFactory();
+ if(senderIds!= null){
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()){
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ fact.setDataPolicy(policy);
+ fact.setScope(scope);
+ fact.setOffHeap(offHeap);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ }
+
+ public static void createAsyncEventQueue(
+ String asyncChannelId, boolean isParallel,
+ Integer maxMemory, Integer batchSize, boolean isConflation,
+ boolean isPersistent, String diskStoreName, boolean isDiskSynchronous) {
+
+ if (diskStoreName != null) {
+ File directory = new File(asyncChannelId + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ directory.mkdir();
+ File[] dirs1 = new File[] { directory };
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ dsf.setDiskDirs(dirs1);
+ dsf.create(diskStoreName);
+ }
+
+ AsyncEventListener asyncEventListener = new MyAsyncEventListener();
+
+ AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+ factory.setBatchSize(batchSize);
+ factory.setPersistent(isPersistent);
+ factory.setDiskStoreName(diskStoreName);
+ factory.setDiskSynchronous(isDiskSynchronous);
+ factory.setBatchConflationEnabled(isConflation);
+ factory.setMaximumQueueMemory(maxMemory);
+ factory.setParallel(isParallel);
+ //set dispatcher threads
+ factory.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ factory.create(asyncChannelId, asyncEventListener);
+ }
+
+ public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
+ .getName());
+ try {
+ AttributesFactory fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(totalNumBuckets);
+ pfact.setRedundantCopies(redundantCopies);
+ pfact.setRecoveryDelay(0);
+ fact.setPartitionAttributes(pfact.create());
+ fact.setOffHeap(offHeap);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ } finally {
+ exp.remove();
+ exp1.remove();
+ }
+ }
+
+ // TODO:OFFHEAP: add offheap flavor
+ public static void createPartitionedRegionWithPersistence(String regionName,
+ String senderIds, Integer redundantCopies, Integer totalNumBuckets) {
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
+ .getName());
+ try {
+ AttributesFactory fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(totalNumBuckets);
+ pfact.setRedundantCopies(redundantCopies);
+ pfact.setRecoveryDelay(0);
+ fact.setPartitionAttributes(pfact.create());
+ fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ } finally {
+ exp.remove();
+ exp1.remove();
+ }
+ }
+ public static void createColocatedPartitionedRegion(String regionName,
+ String senderIds, Integer redundantCopies, Integer totalNumBuckets, String colocatedWith) {
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
+ .getName());
+ try {
+ AttributesFactory fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(totalNumBuckets);
+ pfact.setRedundantCopies(redundantCopies);
+ pfact.setRecoveryDelay(0);
+ pfact.setColocatedWith(colocatedWith);
+ fact.setPartitionAttributes(pfact.create());
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ } finally {
+ exp.remove();
+ exp1.remove();
+ }
+ }
+
+ public static void addSenderThroughAttributesMutator(String regionName,
+ String senderIds){
+ final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ AttributesMutator mutator = r.getAttributesMutator();
+ mutator.addGatewaySenderId(senderIds);
+ }
+
+ public static void addAsyncEventQueueThroughAttributesMutator(
+ String regionName, String queueId) {
+ final Region r = cache.getRegion(Region.SEPARATOR + regionName);
+ assertNotNull(r);
+ AttributesMutator mutator = r.getAttributesMutator();
+ mutator.addAsyncEventQueueId(queueId);
+ }
+
+ public static void createPartitionedRegionAsAccessor(
+ String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
+ AttributesFactory fact = new AttributesFactory();
+ if(senderIds!= null){
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()){
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(totalNumBuckets);
+ pfact.setRedundantCopies(redundantCopies);
+ pfact.setLocalMaxMemory(0);
+ fact.setPartitionAttributes(pfact.create());
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ }
+
+ public static void createPartitionedRegionWithSerialParallelSenderIds(String regionName, String serialSenderIds, String parallelSenderIds, String colocatedWith, Boolean offHeap){
+ AttributesFactory fact = new AttributesFactory();
+ if (serialSenderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(serialSenderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ if (parallelSenderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(parallelSenderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setColocatedWith(colocatedWith);
+ fact.setPartitionAttributes(pfact.create());
+ fact.setOffHeap(offHeap);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ }
+
+ public static void createPersistentPartitionedRegion(
+ String regionName,
+ String senderIds,
+ Integer redundantCopies,
+ Integer totalNumBuckets,
+ Boolean offHeap){
+
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(PartitionOfflineException.class
+ .getName());
+ try {
+
+ AttributesFactory fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(totalNumBuckets);
+ pfact.setRedundantCopies(redundantCopies);
+ fact.setPartitionAttributes(pfact.create());
+ fact.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+ fact.setOffHeap(offHeap);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ } finally {
+ exp.remove();
+ exp1.remove();
+ }
+ }
+
+ public static void createCustomerOrderShipmentPartitionedRegion(
+ String senderIds, Integer redundantCopies,
+ Integer totalNumBuckets, Boolean offHeap) {
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ try {
+ AttributesFactory fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(redundantCopies)
+ .setTotalNumBuckets(totalNumBuckets)
+ .setPartitionResolver(
+ new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
+ fact.setPartitionAttributes(paf.create());
+ fact.setOffHeap(offHeap);
+ customerRegion = (PartitionedRegion)cache.createRegionFactory(
+ fact.create()).create(customerRegionName);
+ assertNotNull(customerRegion);
+ LogWriterUtils.getLogWriter().info(
+ "Partitioned Region CUSTOMER created Successfully :"
+ + customerRegion.toString());
+
+ paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(redundantCopies)
+ .setTotalNumBuckets(totalNumBuckets)
+ .setColocatedWith(customerRegionName)
+ .setPartitionResolver(
+ new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
+ fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ fact.setPartitionAttributes(paf.create());
+ fact.setOffHeap(offHeap);
+ orderRegion = (PartitionedRegion)cache.createRegionFactory(fact.create())
+ .create(orderRegionName);
+ assertNotNull(orderRegion);
+ LogWriterUtils.getLogWriter().info(
+ "Partitioned Region ORDER created Successfully :"
+ + orderRegion.toString());
+
+ paf = new PartitionAttributesFactory();
+ paf.setRedundantCopies(redundantCopies)
+ .setTotalNumBuckets(totalNumBuckets)
+ .setColocatedWith(orderRegionName)
+ .setPartitionResolver(
+ new CustomerIDPartitionResolver("CustomerIDPartitionResolver"));
+ fact = new AttributesFactory();
+ if (senderIds != null) {
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ fact.setPartitionAttributes(paf.create());
+ fact.setOffHeap(offHeap);
+ shipmentRegion = (PartitionedRegion)cache.createRegionFactory(
+ fact.create()).create(shipmentRegionName);
+ assertNotNull(shipmentRegion);
+ LogWriterUtils.getLogWriter().info(
+ "Partitioned Region SHIPMENT created Successfully :"
+ + shipmentRegion.toString());
+ } finally {
+ exp.remove();
+ }
+ }
+
+ public static void createColocatedPartitionedRegions(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
+ AttributesFactory fact = new AttributesFactory();
+ if(senderIds!= null){
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()){
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(totalNumBuckets);
+ pfact.setRedundantCopies(redundantCopies);
+ fact.setPartitionAttributes(pfact.create());
+ fact.setOffHeap(offHeap);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+
+ pfact.setColocatedWith(r.getName());
+ fact.setPartitionAttributes(pfact.create());
+ fact.setOffHeap(offHeap);
+ Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1");
+ assertNotNull(r1);
+
+ Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2");
+ assertNotNull(r2);
+ }
+
+ public static void createColocatedPartitionedRegions2 (String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets, Boolean offHeap){
+ AttributesFactory fact = new AttributesFactory();
+ if(senderIds!= null){
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()){
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(totalNumBuckets);
+ pfact.setRedundantCopies(redundantCopies);
+ fact.setPartitionAttributes(pfact.create());
+ fact.setOffHeap(offHeap);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+
+
+ fact = new AttributesFactory();
+ pfact.setColocatedWith(r.getName());
+ fact.setPartitionAttributes(pfact.create());
+ fact.setOffHeap(offHeap);
+ Region r1 = cache.createRegionFactory(fact.create()).create(regionName+"_child1");
+ assertNotNull(r1);
+
+ Region r2 = cache.createRegionFactory(fact.create()).create(regionName+"_child2");
+ assertNotNull(r2);
+ }
+
+ public static void createCacheInVMs(Integer locatorPort, VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(() -> createCache(locatorPort));
+ }
+ }
+
+ public static void addListenerToSleepAfterCreateEvent(int milliSeconds, final String regionName) {
+ cache.getRegion(regionName).getAttributesMutator()
+ .addCacheListener(new CacheListenerAdapter<Object, Object>() {
+ @Override
+ public void afterCreate(final EntryEvent<Object, Object> event) {
+ try {
+ Thread.sleep(milliSeconds);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+ }
+
+ private static CacheListener myListener;
+ public static void longPauseAfterNumEvents(int numEvents, int milliSeconds) {
+ myListener = new CacheListenerAdapter<Object, Object>() {
+ @Override
+ public void afterCreate(final EntryEvent<Object, Object> event) {
+ try {
+ if (event.getRegion().size() >= numEvents){
+ Thread.sleep(milliSeconds);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
+ .addCacheListener(myListener);
+ }
+
+ public static void removeCacheListener() {
+ cache.getRegion(getTestMethodName() + "_RR_1").getAttributesMutator()
+ .removeCacheListener(myListener);
+
+ }
+
+
+ public static void createCache(Integer locPort){
+ createCache(false, locPort);
+ }
+ public static void createManagementCache(Integer locPort){
+ createCache(true, locPort);
+ }
+
+ public static void createCacheConserveSockets(Boolean conserveSockets,Integer locPort){
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+ props.setProperty(CONSERVE_SOCKETS, conserveSockets.toString());
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ }
+
+ protected static void createCache(boolean management, Integer locPort) {
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ if (management) {
+ props.setProperty(JMX_MANAGER, "true");
+ props.setProperty(JMX_MANAGER_START, "false");
+ props.setProperty(JMX_MANAGER_PORT, "0");
+ props.setProperty(JMX_MANAGER_HTTP_PORT, "0");
+ }
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ }
+
+ protected static void createCacheWithSSL(Integer locPort) {
+ WANTestBase test = new WANTestBase();
+
+ boolean gatewaySslenabled = true;
+ String gatewaySslprotocols = "any";
+ String gatewaySslciphers = "any";
+ boolean gatewaySslRequireAuth = true;
+
+ Properties gemFireProps = test.getDistributedSystemProperties();
+ gemFireProps.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
+ gemFireProps.put(GATEWAY_SSL_ENABLED, String.valueOf(gatewaySslenabled));
+ gemFireProps.put(GATEWAY_SSL_PROTOCOLS, gatewaySslprotocols);
+ gemFireProps.put(GATEWAY_SSL_CIPHERS, gatewaySslciphers);
+ gemFireProps.put(GATEWAY_SSL_REQUIRE_AUTHENTICATION, String.valueOf(gatewaySslRequireAuth));
+
+ gemFireProps.put(GATEWAY_SSL_KEYSTORE_TYPE, "jks");
+ gemFireProps.put(GATEWAY_SSL_KEYSTORE,
+ TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.keystore"));
+ gemFireProps.put(GATEWAY_SSL_KEYSTORE_PASSWORD, "password");
+ gemFireProps.put(GATEWAY_SSL_TRUSTSTORE,
+ TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/client.truststore"));
+ gemFireProps.put(GATEWAY_SSL_TRUSTSTORE_PASSWORD, "password");
+
+ gemFireProps.setProperty(MCAST_PORT, "0");
+ gemFireProps.setProperty(LOCATORS, "localhost[" + locPort + "]");
+
+ LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps);
+
+ InternalDistributedSystem ds = test.getSystem(gemFireProps);
+ cache = CacheFactory.create(ds);
+ }
+
+ public static void createCache_PDX(Integer locPort){
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+ InternalDistributedSystem ds = test.getSystem(props);
+ CacheConfig cacheConfig = new CacheConfig();
+ cacheConfig.setPdxPersistent(true);
+ cacheConfig.setPdxDiskStore("PDX_TEST");
+ cache = GemFireCacheImpl.create(ds, false, cacheConfig);
+
+ File pdxDir = new File(CacheTestCase.getDiskDir(), "pdx");
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File [] dirs1 = new File[] {pdxDir};
+ dsf.setDiskDirs(dirs1).setMaxOplogSize(1).create("PDX_TEST");
+ }
+
+ public static void createCache(Integer locPort1, Integer locPort2){
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + locPort1
+ + "],localhost[" + locPort2 + "]");
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ }
+
+ public static void createCacheWithoutLocator(Integer mCastPort){
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "" + mCastPort);
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ }
+
+ /**
+ * Method that creates a bridge server
+ *
+ * @return Integer Port on which the server is started.
+ */
+ public static Integer createCacheServer() {
+ CacheServer server1 = cache.addCacheServer();
+ assertNotNull(server1);
+ server1.setPort(0);
+ try {
+ server1.start();
+ }
+ catch (IOException e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start the Server", e);
+ }
+ assertTrue(server1.isRunning());
+
+ return new Integer(server1.getPort());
+ }
+
+ /**
+ * Returns a Map that contains the count for number of bridge server and number
+ * of Receivers.
+ *
+ * @return Map
+ */
+ public static Map getCacheServers() {
+ List cacheServers = cache.getCacheServers();
+
+ Map cacheServersMap = new HashMap();
+ Iterator itr = cacheServers.iterator();
+ int bridgeServerCounter = 0;
+ int receiverServerCounter = 0;
+ while (itr.hasNext()) {
+ CacheServerImpl cacheServer = (CacheServerImpl) itr.next();
+ if (cacheServer.getAcceptor().isGatewayReceiver()) {
+ receiverServerCounter++;
+ } else {
+ bridgeServerCounter++;
+ }
+ }
+ cacheServersMap.put("BridgeServer", bridgeServerCounter);
+ cacheServersMap.put("ReceiverServer", receiverServerCounter);
+ return cacheServersMap;
+ }
+
+ public static void startSenderInVMs(String senderId, VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(() -> startSender(senderId));
+ }
+ }
+
+ public static void startSenderInVMsAsync(String senderId, VM... vms) {
+ List<AsyncInvocation> tasks = new LinkedList<>();
+ for (VM vm : vms) {
+ tasks.add(vm.invokeAsync(() -> startSender(senderId)));
+ }
+ for (AsyncInvocation invocation : tasks) {
+ try {
+ invocation.join(30000); // TODO: these might be AsyncInvocation orphans
+ }
+ catch (InterruptedException e) {
+ fail("Starting senders was interrupted");
+ }
+ }
+ }
+
+
+ public static void startSender(String senderId) {
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ IgnoredException exp1 = IgnoredException.addIgnoredException(InterruptedException.class
+ .getName());
+ try {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ sender.start();
+ } finally {
+ exp.remove();
+ exp1.remove();
+ exln.remove();
+ }
+
+ }
+
+ public static void enableConflation(String senderId) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ AbstractGatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = (AbstractGatewaySender)s;
+ break;
+ }
+ }
+ sender.test_setBatchConflationEnabled(true);
+ }
+
+ public static Map getSenderToReceiverConnectionInfo(String senderId){
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ sender = s;
+ break;
+ }
+ }
+ Map connectionInfo = null;
+ if (!sender.isParallel() && ((AbstractGatewaySender) sender).isPrimary()) {
+ connectionInfo = new HashMap();
+ GatewaySenderEventDispatcher dispatcher =
+ ((AbstractGatewaySender)sender).getEventProcessor().getDispatcher();
+ if (dispatcher instanceof GatewaySenderEventRemoteDispatcher) {
+ ServerLocation serverLocation =
+ ((GatewaySenderEventRemoteDispatcher) dispatcher).getConnection(false).getServer();
+ connectionInfo.put("serverHost", serverLocation.getHostName());
+ connectionInfo.put("serverPort", serverLocation.getPort());
+
+ }
+ }
+ return connectionInfo;
+ }
+ public static List<Integer> getSenderStats(String senderId, final int expectedQueueSize){
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ AbstractGatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = (AbstractGatewaySender)s;
+ break;
+ }
+ }
+ final GatewaySenderStats statistics = sender.getStatistics();
+ if (expectedQueueSize != -1) {
+ final RegionQueue regionQueue;
+ regionQueue = sender.getQueues().toArray(
+ new RegionQueue[1])[0];
+ Awaitility.await().atMost(120,TimeUnit.SECONDS).until(() -> assertEquals("Expected queue entries: " +
+ expectedQueueSize + " but actual entries: " + regionQueue.size(), expectedQueueSize,regionQueue.size()));
+ }
+ ArrayList<Integer> stats = new ArrayList<Integer>();
+ stats.add(statistics.getEventQueueSize());
+ stats.add(statistics.getEventsReceived());
+ stats.add(statistics.getEventsQueued());
+ stats.add(statistics.getEventsDistributed());
+ stats.add(statistics.getBatchesDistributed());
+ stats.add(statistics.getBatchesRedistributed());
+ stats.add(statistics.getEventsFiltered());
+ stats.add(statistics.getEventsNotQueuedConflated());
+ stats.add(statistics.getEventsConflatedFromBatches());
+ return stats;
+ }
+
+ public static void checkQueueStats(String senderId, final int queueSize,
+ final int eventsReceived, final int eventsQueued,
+ final int eventsDistributed) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+
+ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+ assertEquals(queueSize, statistics.getEventQueueSize());
+ assertEquals(eventsReceived, statistics.getEventsReceived());
+ assertEquals(eventsQueued, statistics.getEventsQueued());
+ assert(statistics.getEventsDistributed() >= eventsDistributed);
+ }
+
+ public static void checkGatewayReceiverStats(int processBatches,
+ int eventsReceived, int creates) {
+ Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
+ CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
+ .getAcceptor().getStats();
+
+ assertTrue(stats instanceof GatewayReceiverStats);
+ GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats;
+ assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches);
+ assertEquals(eventsReceived, gatewayReceiverStats.getEventsReceived());
+ assertEquals(creates, gatewayReceiverStats.getCreateRequest());
+ }
+
+ public static void checkMinimumGatewayReceiverStats(int processBatches,
+ int eventsReceived) {
+ Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
+ CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
+ .getAcceptor().getStats();
+
+ assertTrue(stats instanceof GatewayReceiverStats);
+ GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats;
+ assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches);
+ assertTrue(gatewayReceiverStats.getEventsReceived()>= eventsReceived);
+ }
+
+ public static void checkExceptionStats(int exceptionsOccurred) {
+ Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
+ CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
+ .getAcceptor().getStats();
+
+ assertTrue(stats instanceof GatewayReceiverStats);
+ GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats;
+ if (exceptionsOccurred == 0) {
+ assertEquals(exceptionsOccurred, gatewayReceiverStats
+ .getExceptionsOccured());
+ }
+ else {
+ assertTrue(gatewayReceiverStats.getExceptionsOccured() >= exceptionsOccurred);
+ }
+ }
+
+ public static void checkGatewayReceiverStatsHA(int processBatches,
+ int eventsReceived, int creates) {
+ Set<GatewayReceiver> gatewayReceivers = cache.getGatewayReceivers();
+ GatewayReceiver receiver = gatewayReceivers.iterator().next();
+ CacheServerStats stats = ((CacheServerImpl)receiver.getServer())
+ .getAcceptor().getStats();
+
+ assertTrue(stats instanceof GatewayReceiverStats);
+ GatewayReceiverStats gatewayReceiverStats = (GatewayReceiverStats)stats;
+ assertTrue(gatewayReceiverStats.getProcessBatchRequests() >= processBatches);
+ assertTrue(gatewayReceiverStats.getEventsReceived() >= eventsReceived);
+ assertTrue(gatewayReceiverStats.getCreateRequest() >= creates);
+ }
+
+ public static void checkEventFilteredStats(String senderId, final int eventsFiltered) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+ assertEquals(eventsFiltered, statistics.getEventsFiltered());
+ }
+
+ public static void checkConflatedStats(String senderId, final int eventsConflated) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender).getStatistics();
+ assertEquals(eventsConflated, statistics.getEventsNotQueuedConflated());
+ }
+
+ public static void checkStats_Failover(String senderId,
+ final int eventsReceived) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender)
+ .getStatistics();
+
+ assertEquals(eventsReceived, statistics.getEventsReceived());
+ assertEquals(eventsReceived, (statistics.getEventsQueued()
+ + statistics.getUnprocessedTokensAddedByPrimary() + statistics
+ .getUnprocessedEventsRemovedByPrimary()));
+ }
+
+ public static void checkBatchStats(String senderId, final int batches) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender)
+ .getStatistics();
+ assert (statistics.getBatchesDistributed() >= batches);
+ assertEquals(0, statistics.getBatchesRedistributed());
+ }
+
+ public static void checkBatchStats(String senderId,
+ final boolean batchesDistributed, final boolean batchesRedistributed) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender)
+ .getStatistics();
+ assertEquals(batchesDistributed, (statistics.getBatchesDistributed() > 0));
+ assertEquals(batchesRedistributed,
+ (statistics.getBatchesRedistributed() > 0));
+ }
+
+ public static void checkUnProcessedStats(String senderId, int events) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ final GatewaySenderStats statistics = ((AbstractGatewaySender)sender)
+ .getStatistics();
+ assertEquals(events,
+ (statistics.getUnprocessedEventsAddedBySecondary() + statistics
+ .getUnprocessedTokensRemovedBySecondary()));
+ assertEquals(events,
+ (statistics.getUnprocessedEventsRemovedByPrimary() + statistics
+ .getUnprocessedTokensAddedByPrimary()));
+ }
+
+ public static void waitForSenderRunningState(String senderId){
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+ try {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ final GatewaySender sender = getGatewaySenderById(senders, senderId);
+ Awaitility.await().atMost(300,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender isRunning state to "
+ + "be true but is false", true, (sender != null && sender.isRunning())));
+ } finally {
+ exln.remove();
+ }
+ }
+
+ public static void waitForSenderToBecomePrimary(String senderId){
+ Set<GatewaySender> senders = ((GemFireCacheImpl)cache).getAllGatewaySenders();
+ final GatewaySender sender = getGatewaySenderById(senders, senderId);
+ Awaitility.await().atMost(10,TimeUnit.SECONDS).until(() -> assertEquals("Expected sender primary state to "
+ + "be true but is false", true, (sender != null && ((AbstractGatewaySender)sender).isPrimary())));
+ }
+
+ private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ return s;
+ }
+ }
+ //if none of the senders matches with the supplied senderid, return null
+ return null;
+ }
+
+ public static HashMap checkQueue(){
+ HashMap listenerAttrs = new HashMap();
+ listenerAttrs.put("Create", listener1.createList);
+ listenerAttrs.put("Update", listener1.updateList);
+ listenerAttrs.put("Destroy", listener1.destroyList);
+ return listenerAttrs;
+ }
+
+ public static void checkQueueOnSecondary (final Map primaryUpdatesMap){
+ final HashMap secondaryUpdatesMap = new HashMap();
+ secondaryUpdatesMap.put("Create", listener1.createList);
+ secondaryUpdatesMap.put("Update", listener1.updateList);
+ secondaryUpdatesMap.put("Destroy", listener1.destroyList);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ secondaryUpdatesMap.put("Create", listener1.createList);
+ secondaryUpdatesMap.put("Update", listener1.updateList);
+ secondaryUpdatesMap.put("Destroy", listener1.destroyList);
+ assertEquals("Expected secondary map to be " + primaryUpdatesMap + " but it is " + secondaryUpdatesMap,
+ true,secondaryUpdatesMap.equals(primaryUpdatesMap));
+ });
+ }
+
+ public static HashMap checkQueue2(){
+ HashMap listenerAttrs = new HashMap();
+ listenerAttrs.put("Create", listener2.createList);
+ listenerAttrs.put("Update", listener2.updateList);
+ listenerAttrs.put("Destroy", listener2.destroyList);
+ return listenerAttrs;
+ }
+
+ public static HashMap checkPR(String regionName){
+ PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+ QueueListener listener = (QueueListener)region.getCacheListener();
+
+ HashMap listenerAttrs = new HashMap();
+ listenerAttrs.put("Create", listener.createList);
+ listenerAttrs.put("Update", listener.updateList);
+ listenerAttrs.put("Destroy", listener.destroyList);
+ return listenerAttrs;
+ }
+
+ public static HashMap checkBR(String regionName, int numBuckets){
+ PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+ HashMap listenerAttrs = new HashMap();
+ for (int i = 0; i < numBuckets; i++) {
+ BucketRegion br = region.getBucketRegion(i);
+ QueueListener listener = (QueueListener)br.getCacheListener();
+ listenerAttrs.put("Create"+i, listener.createList);
+ listenerAttrs.put("Update"+i, listener.updateList);
+ listenerAttrs.put("Destroy"+i, listener.destroyList);
+ }
+ return listenerAttrs;
+ }
+
+ public static HashMap checkQueue_BR(String senderId, int numBuckets){
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ sender = s;
+ break;
+ }
+ }
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
+ .getQueues().toArray(new RegionQueue[1])[0];
+
+ PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
+ HashMap listenerAttrs = new HashMap();
+ for (int i = 0; i < numBuckets; i++) {
+ BucketRegion br = region.getBucketRegion(i);
+ if (br != null) {
+ QueueListener listener = (QueueListener)br.getCacheListener();
+ if (listener != null) {
+ listenerAttrs.put("Create"+i, listener.createList);
+ listenerAttrs.put("Update"+i, listener.updateList);
+ listenerAttrs.put("Destroy"+i, listener.destroyList);
+ }
+ }
+ }
+ return listenerAttrs;
+ }
+
+ public static void addListenerOnBucketRegion(String regionName, int numBuckets) {
+ WANTestBase test = new WANTestBase();
+ test.addCacheListenerOnBucketRegion(regionName, numBuckets);
+ }
+
+ private void addCacheListenerOnBucketRegion(String regionName, int numBuckets){
+ PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+ for (int i = 0; i < numBuckets; i++) {
+ BucketRegion br = region.getBucketRegion(i);
+ AttributesMutator mutator = br.getAttributesMutator();
+ listener1 = new QueueListener();
+ mutator.addCacheListener(listener1);
+ }
+ }
+
+ public static void addListenerOnQueueBucketRegion(String senderId, int numBuckets) {
+ WANTestBase test = new WANTestBase();
+ test.addCacheListenerOnQueueBucketRegion(senderId, numBuckets);
+ }
+
+ private void addCacheListenerOnQueueBucketRegion(String senderId, int numBuckets){
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ sender = s;
+ break;
+ }
+ }
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
+ .getQueues().toArray(new RegionQueue[1])[0];
+
+ PartitionedRegion region = (PartitionedRegion)parallelQueue.getRegion();
+ for (int i = 0; i < numBuckets; i++) {
+ BucketRegion br = region.getBucketRegion(i);
+ if (br != null) {
+ AttributesMutator mutator = br.getAttributesMutator();
+ CacheListener listener = new QueueListener();
+ mutator.addCacheListener(listener);
+ }
+ }
+
+ }
+
+ public static void addQueueListener(String senderId, boolean isParallel){
+ WANTestBase test = new WANTestBase();
+ test.addCacheQueueListener(senderId, isParallel);
+ }
+
+ public static void addSecondQueueListener(String senderId, boolean isParallel){
+ WANTestBase test = new WANTestBase();
+ test.addSecondCacheQueueListener(senderId, isParallel);
+ }
+
+ public static void addListenerOnRegion(String regionName){
+ WANTestBase test = new WANTestBase();
+ test.addCacheListenerOnRegion(regionName);
+ }
+ private void addCacheListenerOnRegion(String regionName){
+ Region region = cache.getRegion(regionName);
+ AttributesMutator mutator = region.getAttributesMutator();
+ listener1 = new QueueListener();
+ mutator.addCacheListener(listener1);
+ }
+
+ private void addCacheQueueListener(String senderId, boolean isParallel) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ sender = s;
+ break;
+ }
+ }
+ listener1 = new QueueListener();
+ if (!isParallel) {
+ Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+ for(RegionQueue q: queues) {
+ q.addCacheListener(listener1);
+ }
+ }
+ else {
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
+ .getQueues().toArray(new RegionQueue[1])[0];
+ parallelQueue.addCacheListener(listener1);
+ }
+ }
+
+ private void addSecondCacheQueueListener(String senderId, boolean isParallel) {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ listener2 = new QueueListener();
+ if (!isParallel) {
+ Set<RegionQueue> queues = ((AbstractGatewaySender)sender).getQueues();
+ for(RegionQueue q: queues) {
+ q.addCacheListener(listener2);
+ }
+ }
+ else {
+ RegionQueue parallelQueue = ((AbstractGatewaySender)sender)
+ .getQueues().toArray(new RegionQueue[1])[0];
+ parallelQueue.addCacheListener(listener2);
+ }
+ }
+
+ public static void pauseSender(String senderId) {
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ try {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ sender.pause();
+ ((AbstractGatewaySender) sender).getEventProcessor().waitForDispatcherToPause();
+
+ } finally {
+ exp.remove();
+ exln.remove();
+ }
+ }
+
+ public static void resumeSender(String senderId) {
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ try {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ sender.resume();
+ } finally {
+ exp.remove();
+ exln.remove();
+ }
+ }
+
+ public static void stopSender(String senderId) {
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+ IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class
+ .getName());
+ try {
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for (GatewaySender s : senders) {
+ if (s.getId().equals(senderId)) {
+ sender = s;
+ break;
+ }
+ }
+ AbstractGatewaySenderEventProcessor eventProcessor = null;
+ if (sender instanceof AbstractGatewaySender) {
+ eventProcessor = ((AbstractGatewaySender) sender).getEventProcessor();
+ }
+ sender.stop();
+
+ Set<RegionQueue> queues = null;
+ if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) {
+ queues = ((ConcurrentSerialGatewaySenderEventProcessor)eventProcessor).getQueues();
+ for (RegionQueue queue: queues) {
+ if (queue instanceof SerialGatewaySenderQueue) {
+ assertFalse(((SerialGatewaySenderQueue) queue).isRemovalThreadAlive());
+ }
+ }
+ }
+ } finally {
+ exp.remove();
+ exln.remove();
+ }
+ }
+
+ public static void stopReceivers() {
+ Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+ for (GatewayReceiver receiver : receivers) {
+ receiver.stop();
+ }
+ }
+
+ public static void startReceivers() {
+ Set<GatewayReceiver> receivers = cache.getGatewayReceivers();
+ for (GatewayReceiver receiver : receivers) {
+ try {
+ receiver.start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static GatewaySenderFactory configureGateway(DiskStoreFactory dsf, File[] dirs1, String dsName,
+ boolean isParallel, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isPersistent,
+ GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy policy) {
+
+ InternalGatewaySenderFactory gateway = (InternalGatewaySenderFactory)cache.createGatewaySenderFactory();
+ gateway.setParallel(isParallel);
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.setManualStart(isManualStart);
+ gateway.setDispatcherThreads(numDispatchers);
+ gateway.setOrderPolicy(policy);
+ gateway.setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ eventFilter = filter;
+ gateway.addGatewayEventFilter(filter);
+ }
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+ .getName());
+ } else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+ return gateway;
+ }
+
+ public static void createSender(String dsName, int remoteDsId,
+ boolean isParallel, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isPersistent,
+ GatewayEventFilter filter, boolean isManualStart) {
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+ try {
+ File persistentDirectory = new File(dsName + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File[] dirs1 = new File[] { persistentDirectory };
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, numDispatcherThreadsForTheRun, GatewaySender.DEFAULT_ORDER_POLICY);
+ gateway.create(dsName, remoteDsId);
+
+ } finally {
+ exln.remove();
+ }
+ }
+
+ public static void createSenderWithMultipleDispatchers(String dsName, int remoteDsId,
+ boolean isParallel, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isPersistent,
+ GatewayEventFilter filter, boolean isManualStart, int numDispatchers, OrderPolicy orderPolicy) {
+ final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
+ try {
+ File persistentDirectory = new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File[] dirs1 = new File[] { persistentDirectory };
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName,isParallel, maxMemory, batchSize, isConflation, isPersistent, filter,
+ isManualStart, numDispatchers, orderPolicy);
+ gateway.create(dsName, remoteDsId);
+
+ } finally {
+ exln.remove();
+ }
+ }
+
+ public static void createSenderWithoutDiskStore(String dsName, int remoteDsId, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isManualStart) {
+
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setParallel(true);
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManualStart);
+ //set dispatcher threads
+ gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.create(dsName, remoteDsId);
+ }
+
+ public static void createConcurrentSender(String dsName, int remoteDsId,
+ boolean isParallel, Integer maxMemory, Integer batchSize,
+ boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
+ boolean isManualStart, int concurrencyLevel, OrderPolicy policy) {
+
+ File persistentDirectory = new File(dsName + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File[] dirs1 = new File[] { persistentDirectory };
+ GatewaySenderFactory gateway = configureGateway(dsf, dirs1, dsName, isParallel, maxMemory, batchSize, isConflation, isPersistent, filter, isManualStart, concurrencyLevel, policy);
+ gateway.create(dsName, remoteDsId);
+ }
+
+ public static void createSenderForValidations(String dsName, int remoteDsId,
+ boolean isParallel, Integer alertThreshold,
+ boolean isConflation, boolean isPersistent,
+ List<GatewayEventFilter> eventFilters,
+ List<GatewayTransportFilter> transportFilters, boolean isManualStart,
+ boolean isDiskSync) {
+ IgnoredException exp1 = IgnoredException.addIgnoredException(RegionDestroyedException.class
+ .getName());
+ try {
+ File persistentDirectory = new File(dsName + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File[] dirs1 = new File[] { persistentDirectory };
+
+ if (isParallel) {
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setParallel(true);
+ gateway.setAlertThreshold(alertThreshold);
+ ((InternalGatewaySenderFactory)gateway)
+ .setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (eventFilters != null) {
+ for (GatewayEventFilter filter : eventFilters) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ }
+ if (transportFilters != null) {
+ for (GatewayTransportFilter filter : transportFilters) {
+ gateway.addGatewayTransportFilter(filter);
+ }
+ }
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1)
+ .create(dsName + "_Parallel").getName());
+ }
+ else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName + "_Parallel");
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.setDiskSynchronous(isDiskSync);
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.setManualStart(isManualStart);
+ //set dispatcher threads
+ gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ gateway.create(dsName, remoteDsId);
+
+ }
+ else {
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setAlertThreshold(alertThreshold);
+ gateway.setManualStart(isManualStart);
+ //set dispatcher threads
+ gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ ((InternalGatewaySenderFactory)gateway)
+ .setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (eventFilters != null) {
+ for (GatewayEventFilter filter : eventFilters) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ }
+ if (transportFilters != null) {
+ for (GatewayTransportFilter filter : transportFilters) {
+ gateway.addGatewayTransportFilter(filter);
+ }
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1)
+ .create(dsName + "_Serial").getName());
+ }
+ else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName + "_Serial");
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.setDiskSynchronous(isDiskSync);
+ gateway.create(dsName, remoteDsId);
+ }
+ } finally {
+ exp1.remove();
+ }
+ }
+
+ public static String createSenderWithDiskStore(String dsName, int remoteDsId,
+ boolean isParallel, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isPersistent,
+ GatewayEventFilter filter, String dsStore, boolean isManualStart) {
+ File persistentDirectory = null;
+ if (dsStore == null) {
+ persistentDirectory = new File(dsName + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ }
+ else {
+ persistentDirectory = new File(dsStore);
+ }
+ LogWriterUtils.getLogWriter().info("The ds is : " + persistentDirectory.getName());
+
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File [] dirs1 = new File[] {persistentDirectory};
+
+ if(isParallel) {
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setParallel(true);
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManualStart);
+ //set dispatcher threads
+ gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ if(isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ String dsname = dsf.setDiskDirs(dirs1).create(dsName).getName();
+ gateway.setDiskStoreName(dsname);
+ LogWriterUtils.getLogWriter().info("The DiskStoreName is : " + dsname);
+ }
+ else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ LogWriterUtils.getLogWriter().info("The ds is : " + store.getName());
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.create(dsName, remoteDsId);
+
+ }else {
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManualStart);
+ //set dispatcher threads
+ gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ if(isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+ }
+ else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.create(dsName, remoteDsId);
+ }
+ return persistentDirectory.getName();
+ }
+
+
+ public static void createSenderWithListener(String dsName, int remoteDsName,
+ boolean isParallel, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isPersistent,
+ GatewayEventFilter filter, boolean attachTwoListeners, boolean isManualStart) {
+ File persistentDirectory = new File(dsName + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File[] dirs1 = new File[] { persistentDirectory };
+
+ if (isParallel) {
+ GatewaySenderFactory gateway = cache
+ .createGatewaySenderFactory();
+ gateway.setParallel(true);
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManualStart);
+ //set dispatcher threads
+ gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ ((InternalGatewaySenderFactory)gateway)
+ .setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+ .getName());
+ } else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.create(dsName, remoteDsName);
+
+ } else {
+ GatewaySenderFactory gateway = cache
+ .createGatewaySenderFactory();
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManualStart);
+ //set dispatcher threads
+ gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
+ ((InternalGatewaySenderFactory)gateway)
+ .setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+ .getName());
+ } else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+
+ eventListener1 = new MyGatewaySenderEventListener();
+ ((InternalGatewaySenderFactory)gateway).addAsyncEventListener(eventListener1);
+ if (attachTwoListeners) {
+ eventListener2 = new MyGatewaySenderEventListener2();
+ ((InternalGatewaySenderFactory)gateway).addAsyncEventListener(eventListener2);
+ }
+ ((InternalGatewaySenderFactory)gateway).create(dsName);
+ }
+ }
+
+ public static void createReceiverInVMs(VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(() -> createReceiver());
+ }
+ }
+
+ public static int createReceiver() {
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ fact.setStartPort(port);
+ fact.setEndPort(port);
+ fact.setManualStart(true);
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail("Test " + getTestMethodName()
+ + " failed to start GatewayReceiver on port " + port, e);
+ }
+ return port;
+ }
+
+ public static void createReceiverWithBindAddress(int locPort) {
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
+ props.setProperty(LOCATORS, "localhost[" + locPort
+ + "]");
+
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ fact.setStartPort(port);
+ fact.setEndPort(port);
+ fact.setManualStart(true);
+ fact.setBindAddress("200.112.204.10");
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ fail("Expected GatewayReceiver Exception");
+ }
+ catch (GatewayReceiverException gRE){
+ LogWriterUtils.getLogWriter().fine("Got the GatewayReceiverException", gRE);
+ assertTrue(gRE.getMessage().contains("Failed to create server socket on"));
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ fail("Test " + test.getName()
+ + " failed to start GatewayReceiver on port " + port);
+ }
+ }
+ public static int createReceiverWithSSL(int locPort) {
+ WANTestBase test = new WANTestBase();
+
+ Properties gemFireProps = test.getDistributedSystemProperties();
+
+ gemFireProps.put(LOG_LEVEL, LogWriterUtils.getDUnitLogLevel());
+ gemFireProps.put(GATEWAY_SSL_ENABLED, "true");
+ gemFireProps.put(GATEWAY_SSL_PROTOCOLS, "any");
+ gemFireProps.put(GATEWAY_SSL_CIPHERS, "any");
+ gemFireProps.put(GATEWAY_SSL_REQUIRE_AUTHENTICATION, "true");
+
+ gemFireProps.put(GATEWAY_SSL_KEYSTORE_TYPE, "jks");
+ gemFireProps.put(GATEWAY_SSL_KEYSTORE,
+ TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.keystore"));
+ gemFireProps.put(GATEWAY_SSL_KEYSTORE_PASSWORD, "password");
+ gemFireProps.put(GATEWAY_SSL_TRUSTSTORE,
+ TestUtil.getResourcePath(WANTestBase.class, "/com/gemstone/gemfire/cache/client/internal/cacheserver.truststore"));
+ gemFireProps.put(GATEWAY_SSL_TRUSTSTORE_PASSWORD, "password");
+
+ gemFireProps.setProperty(MCAST_PORT, "0");
+ gemFireProps.setProperty(LOCATORS, "localhost[" + locPort + "]");
+
+ LogWriterUtils.getLogWriter().info("Starting cache ds with following properties \n" + gemFireProps);
+
+ InternalDistributedSystem ds = test.getSystem(gemFireProps);
+ cache = CacheFactory.create(ds);
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ fact.setStartPort(port);
+ fact.setEndPort(port);
+ fact.setManualStart(true);
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ fail("Test " + test.getName()
+ + " failed to start GatewayReceiver on port " + port);
+ }
+ return port;
+ }
+
+ public static void createReceiverAndServer(int locPort) {
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + locPort
+ + "]");
+
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ int receiverPort = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ fact.setStartPort(receiverPort);
+ fact.setEndPort(receiverPort);
+ fact.setManualStart(true);
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ }
+ catch (IOException e) {
+ e.printStackTrace();
+ fail("Test " + test.getName()
+ + " failed to start GatewayReceiver on port " + receiverPort);
+ }
+ CacheServer server = cache.addCacheServer();
+ int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ server.setPort(serverPort);
+ server.setHostnameForClients("localhost");
+ try {
+ server.start();
+ } catch (IOException e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e);
+ }
+ }
+
+ public static int createServer(int locPort) {
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "localhost[" + locPort
+ + "]");
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+
+ CacheServer server = cache.addCacheServer();
+ int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+ server.setPort(port);
+ server.setHostnameForClients("localhost");
+ try {
+ server.start();
+ } catch (IOException e) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start server ", e);
+ }
+ return port;
+ }
+
+ public static void createClientWithLocator(int port0,String host,
+ String regionName) {
+ WANTestBase test = new WANTestBase();
+ Properties props = test.getDistributedSystemProperties();
+ props.setProperty(MCAST_PORT, "0");
+ props.setProperty(LOCATORS, "");
+
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+
+ assertNotNull(cache);
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ Pool p;
+ try {
+ p = PoolManager.createFactory().addLocator(host, port0)
+ .setPingInterval(250).setSubscriptionEnabled(true)
+ .setSubscriptionRedundancy(-1).setReadTimeout(2000)
+ .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10)
+ .setRetryAttempts(3).create(regionName);
+ } finally {
+ CacheServerTestUtil.enableShufflingOfEndpoints();
+ }
+
+ AttributesFactory factory = new AttributesFactory();
+ factory.setPoolName(p.getName());
+ factory.setDataPolicy(DataPolicy.NORMAL);
+ RegionAttributes attrs = factory.create();
+ region = cache.createRegion(regionName, attrs);
+ region.registerInterest("ALL_KEYS");
+ assertNotNull(region);
+ LogWriterUtils.getLogWriter().info(
+
<TRUNCATED>