You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2016/01/20 03:22:29 UTC
[23/51] [partial] incubator-geode git commit: WAN and CQ code drop
under the Pivotal SGA
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
new file mode 100644
index 0000000..1a0007c
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml70GatewayDUnitTest.java
@@ -0,0 +1,227 @@
+package com.gemstone.gemfire.cache;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueueFactory;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.CacheXml70DUnitTest;
+import com.gemstone.gemfire.cache30.CacheXmlTestCase;
+import com.gemstone.gemfire.cache30.MyGatewayEventFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+import com.gemstone.gemfire.internal.cache.xmlcache.ParallelGatewaySenderCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.RegionAttributesCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.SerialGatewaySenderCreation;
+
+public class CacheXml70GatewayDUnitTest extends CacheXmlTestCase {
+
+ public CacheXml70GatewayDUnitTest(String name) {
+ super(name);
+ }
+
+ protected String getGemFireVersion() {
+ return CacheXml.VERSION_7_0;
+ }
+
+ /**
+ * Added to test the scenario of defect #50600.
+ */
+ public void testAsyncEventQueueWithGatewayEventFilter() {
+ getSystem();
+ CacheCreation cache = new CacheCreation();
+
+ String id = "WBCLChannel";
+ AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
+ factory.setBatchSize(100);
+ factory.setBatchTimeInterval(500);
+ factory.setBatchConflationEnabled(true);
+ factory.setMaximumQueueMemory(200);
+ factory.setDiskSynchronous(true);
+ factory.setParallel(false);
+ factory.setDispatcherThreads(33);
+ factory.addGatewayEventFilter(new MyGatewayEventFilter());
+
+ AsyncEventListener eventListener = new CacheXml70DUnitTest.MyAsyncEventListener();
+ AsyncEventQueue asyncEventQueue = factory.create(id, eventListener);
+
+ RegionAttributesCreation attrs = new RegionAttributesCreation();
+ attrs.addAsyncEventQueueId(asyncEventQueue.getId());
+ cache.createRegion("UserRegion", attrs);
+
+ testXml(cache);
+ Cache c = getCache();
+ assertNotNull(c);
+
+ Set<AsyncEventQueue> asyncEventQueuesOnCache = c.getAsyncEventQueues();
+ assertTrue("Size of asyncEventQueues should be greater than 0", asyncEventQueuesOnCache.size() > 0);
+
+ for (AsyncEventQueue asyncEventQueueOnCache : asyncEventQueuesOnCache) {
+ CacheXml70DUnitTest.validateAsyncEventQueue(asyncEventQueue, asyncEventQueueOnCache);
+ }
+ }
+
+ public void testGatewayReceiver() throws CacheException{
+ getSystem();
+ CacheCreation cache = new CacheCreation();
+
+ GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory();
+ gatewayReceiverFactory.setBindAddress("");
+ gatewayReceiverFactory.setStartPort(54321);
+ gatewayReceiverFactory.setEndPort(54331);
+ gatewayReceiverFactory.setMaximumTimeBetweenPings(2000);
+ gatewayReceiverFactory.setSocketBufferSize(1500);
+ GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+ gatewayReceiverFactory.addGatewayTransportFilter(myStreamfilter1);
+ GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+ gatewayReceiverFactory.addGatewayTransportFilter(myStreamfilter2);
+ GatewayReceiver receiver1 = gatewayReceiverFactory.create();
+ try {
+ receiver1.start();
+ }
+ catch (IOException e) {
+ fail("Could not start GatewayReceiver");
+ }
+ testXml(cache);
+ Cache c = getCache();
+ assertNotNull(c);
+ Set<GatewayReceiver> receivers = c.getGatewayReceivers();
+ for(GatewayReceiver receiver : receivers){
+ validateGatewayReceiver(receiver1, receiver);
+ }
+ }
+
+ public void testParallelGatewaySender() throws CacheException{
+ getSystem();
+ CacheCreation cache = new CacheCreation();
+
+ GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
+ gatewaySenderFactory.setParallel(true);
+ gatewaySenderFactory.setDispatcherThreads(13);
+ gatewaySenderFactory.setManualStart(true);
+ gatewaySenderFactory.setSocketBufferSize(1234);
+ gatewaySenderFactory.setSocketReadTimeout(1050);
+ gatewaySenderFactory.setBatchConflationEnabled(false);
+ gatewaySenderFactory.setBatchSize(88);
+ gatewaySenderFactory.setBatchTimeInterval(9);
+ gatewaySenderFactory.setPersistenceEnabled(true);
+ gatewaySenderFactory.setDiskStoreName("LNSender");
+ gatewaySenderFactory.setDiskSynchronous(true);
+ gatewaySenderFactory.setMaximumQueueMemory(211);
+ gatewaySenderFactory.setAlertThreshold(35);
+
+ GatewayEventFilter myeventfilter1 = new MyGatewayEventFilter1();
+ gatewaySenderFactory.addGatewayEventFilter(myeventfilter1);
+ GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+ gatewaySenderFactory.addGatewayTransportFilter(myStreamfilter1);
+ GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+ gatewaySenderFactory.addGatewayTransportFilter(myStreamfilter2);
+ GatewaySender parallelGatewaySender = gatewaySenderFactory.create("LN", 2);
+
+ testXml(cache);
+ Cache c = getCache();
+ assertNotNull(c);
+ Set<GatewaySender> sendersOnCache = c.getGatewaySenders();
+ for(GatewaySender sender : sendersOnCache){
+ assertEquals(true, sender.isParallel());
+ validateGatewaySender(parallelGatewaySender, sender);
+ }
+ }
+
+ public void testSerialGatewaySender() throws CacheException{
+ getSystem();
+ CacheCreation cache = new CacheCreation();
+ GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
+ gatewaySenderFactory.setParallel(false);
+ gatewaySenderFactory.setManualStart(true);
+ gatewaySenderFactory.setSocketBufferSize(124);
+ gatewaySenderFactory.setSocketReadTimeout(1000);
+ gatewaySenderFactory.setBatchConflationEnabled(false);
+ gatewaySenderFactory.setBatchSize(100);
+ gatewaySenderFactory.setBatchTimeInterval(10);
+ gatewaySenderFactory.setPersistenceEnabled(true);
+ gatewaySenderFactory.setDiskStoreName("LNSender");
+ gatewaySenderFactory.setDiskSynchronous(true);
+ gatewaySenderFactory.setMaximumQueueMemory(200);
+ gatewaySenderFactory.setAlertThreshold(30);
+
+ GatewayEventFilter myeventfilter1 = new MyGatewayEventFilter1();
+ gatewaySenderFactory.addGatewayEventFilter(myeventfilter1);
+ GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+ gatewaySenderFactory.addGatewayTransportFilter(myStreamfilter1);
+ GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+ gatewaySenderFactory.addGatewayTransportFilter(myStreamfilter2);
+ GatewaySender serialGatewaySender = gatewaySenderFactory.create("LN", 2);
+
+ RegionAttributesCreation attrs = new RegionAttributesCreation();
+ attrs.addGatewaySenderId(serialGatewaySender.getId());
+ cache.createRegion("UserRegion", attrs);
+
+ testXml(cache);
+ Cache c = getCache();
+ assertNotNull(c);
+ Set<GatewaySender> sendersOnCache = c.getGatewaySenders();
+ for(GatewaySender sender : sendersOnCache){
+ assertEquals(false, sender.isParallel());
+ validateGatewaySender(serialGatewaySender, sender);
+ }
+ }
+
+ public static class MyGatewayEventFilter implements GatewayEventFilter, Declarable {
+ public void afterAcknowledgement(GatewayQueueEvent event) {
+ }
+ public boolean beforeEnqueue(GatewayQueueEvent event) {
+ return true;
+ }
+ public boolean beforeTransmit(GatewayQueueEvent event) {
+ return true;
+ }
+ public void close() {
+ }
+ public void init(Properties properties) {
+ }
+ }
+
+ static void validateGatewayReceiver(GatewayReceiver receiver1, GatewayReceiver gatewayReceiver) {
+ assertEquals(receiver1.getHost(), gatewayReceiver.getHost());
+ assertEquals(receiver1.getStartPort(), gatewayReceiver.getStartPort());
+ assertEquals(receiver1.getEndPort(), gatewayReceiver.getEndPort());
+ assertEquals(receiver1.getMaximumTimeBetweenPings(), gatewayReceiver.getMaximumTimeBetweenPings());
+ assertEquals(receiver1.getSocketBufferSize(), gatewayReceiver.getSocketBufferSize());
+ assertEquals(receiver1.getGatewayTransportFilters().size(), gatewayReceiver.getGatewayTransportFilters().size());
+ }
+
+ static void validateGatewaySender(GatewaySender sender1, GatewaySender gatewaySender) {
+ assertEquals(sender1.getId(), gatewaySender.getId());
+ assertEquals(sender1.getRemoteDSId(), gatewaySender.getRemoteDSId());
+ assertEquals(sender1.isParallel(), gatewaySender.isParallel());
+ assertEquals(sender1.isBatchConflationEnabled(), gatewaySender.isBatchConflationEnabled());
+ assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
+ assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval());
+ assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled());
+ assertEquals(sender1.getDiskStoreName(),gatewaySender.getDiskStoreName());
+ assertEquals(sender1.isDiskSynchronous(),gatewaySender.isDiskSynchronous());
+ assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory());
+ assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold());
+ assertEquals(sender1.getGatewayEventFilters().size(), gatewaySender.getGatewayEventFilters().size());
+ assertEquals(sender1.getGatewayTransportFilters().size(), gatewaySender.getGatewayTransportFilters().size());
+
+ boolean isParallel = sender1.isParallel();
+ if (isParallel) {
+ assertTrue("sender should be instanceof Creation", sender1 instanceof ParallelGatewaySenderCreation);
+ } else {
+ assertTrue("sender should be instanceof Creation", sender1 instanceof SerialGatewaySenderCreation);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
new file mode 100644
index 0000000..54a84d6
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/cache/CacheXml80GatewayDUnitTest.java
@@ -0,0 +1,61 @@
+package com.gemstone.gemfire.cache;
+
+import java.io.IOException;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.cache30.CacheXmlTestCase;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter1;
+import com.gemstone.gemfire.cache30.MyGatewayTransportFilter2;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheCreation;
+import com.gemstone.gemfire.internal.cache.xmlcache.CacheXml;
+
+public class CacheXml80GatewayDUnitTest extends CacheXmlTestCase {
+
+ public CacheXml80GatewayDUnitTest(String name) {
+ super(name);
+ }
+
+ protected String getGemFireVersion() {
+ return CacheXml.VERSION_8_0;
+ }
+
+ public void testGatewayReceiverWithManualStartTRUE() throws CacheException{
+ //getSystem();
+ CacheCreation cache = new CacheCreation();
+
+ GatewayReceiverFactory gatewayReceiverFactory = cache.createGatewayReceiverFactory();
+ gatewayReceiverFactory.setBindAddress("");
+ gatewayReceiverFactory.setStartPort(54321);
+ gatewayReceiverFactory.setEndPort(54331);
+ gatewayReceiverFactory.setMaximumTimeBetweenPings(2000);
+ gatewayReceiverFactory.setSocketBufferSize(1500);
+ gatewayReceiverFactory.setManualStart(true);
+ GatewayTransportFilter myStreamfilter1 = new MyGatewayTransportFilter1();
+ gatewayReceiverFactory.addGatewayTransportFilter(myStreamfilter1);
+ GatewayTransportFilter myStreamfilter2 = new MyGatewayTransportFilter2();
+ gatewayReceiverFactory.addGatewayTransportFilter(myStreamfilter2);
+ GatewayReceiver receiver1 = gatewayReceiverFactory.create();
+ try {
+ receiver1.start();
+ }
+ catch (IOException e) {
+ fail("Could not start GatewayReceiver");
+ }
+ testXml(cache);
+ Cache c = getCache();
+ assertNotNull(c);
+ Set<GatewayReceiver> receivers = c.getGatewayReceivers();
+ for(GatewayReceiver receiver : receivers){
+ validateGatewayReceiver(receiver1, receiver);
+ }
+ }
+
+ protected void validateGatewayReceiver(GatewayReceiver receiver1,
+ GatewayReceiver gatewayReceiver){
+ CacheXml70GatewayDUnitTest.validateGatewayReceiver(receiver1, gatewayReceiver);
+ assertEquals(receiver1.isManualStart(), gatewayReceiver.isManualStart());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
new file mode 100755
index 0000000..4ee46d1
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/codeAnalysis/AnalyzeWANSerializablesJUnitTest.java
@@ -0,0 +1,82 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.codeAnalysis;
+
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.codeAnalysis.decode.CompiledClass;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+import com.gemstone.gemfire.util.test.TestUtil;
+
+/**
+ * @author bruces
+ *
+ */
+@Category(IntegrationTest.class)
+public class AnalyzeWANSerializablesJUnitTest extends AnalyzeSerializablesJUnitTest {
+
+ @Before
+ public void loadClasses() throws Exception {
+ if (classes.size() > 0) {
+ return;
+ }
+ System.out.println("loadClasses starting");
+ List<String> excludedClasses = loadExcludedClasses(new File(TestUtil.getResourcePath(AnalyzeWANSerializablesJUnitTest.class, "excludedClasses.txt")));
+ List<String> openBugs = loadOpenBugs(new File(TestUtil.getResourcePath(AnalyzeWANSerializablesJUnitTest.class, "openBugs.txt")));
+ excludedClasses.addAll(openBugs);
+
+ String cp = System.getProperty("java.class.path");
+ System.out.println("java classpath is " + cp);
+ System.out.flush();
+ String[] entries = cp.split(File.pathSeparator);
+ String buildDirName =
+ "gemfire-wan"+File.separatorChar
+ +"build"+File.separatorChar
+ +"classes"+File.separatorChar
+ +"main";
+ String buildDir = null;
+
+ for (int i=0; i<entries.length && buildDir==null; i++) {
+ System.out.println("examining '" + entries[i] + "'");
+ System.out.flush();
+ if (entries[i].endsWith(buildDirName)) {
+ buildDir = entries[i];
+ }
+ }
+ if (buildDir != null) {
+ System.out.println("loading class files from " + buildDir);
+ System.out.flush();
+ long start = System.currentTimeMillis();
+ loadClassesFromBuild(new File(buildDir), excludedClasses);
+ long finish = System.currentTimeMillis();
+ System.out.println("done loading " + classes.size() + " classes. elapsed time = "
+ + (finish-start)/1000 + " seconds");
+ }
+ else {
+ fail("unable to find WAN classes");
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ if (classes != null) {
+ classes.clear();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
new file mode 100644
index 0000000..cb34278
--- /dev/null
+++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/UpdateVersionDUnitTest.java
@@ -0,0 +1,955 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.gopivotal.com/patents.
+ *=========================================================================
+ */
+/**
+ *
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.EntryNotFoundException;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallbackAdapter;
+import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+import com.gemstone.gemfire.internal.cache.partitioned.PRLocallyDestroyedException;
+import com.gemstone.gemfire.internal.cache.versions.VersionSource;
+import com.gemstone.gemfire.internal.cache.versions.VersionStamp;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.SerializableCallable;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * @author Shobhit Agarwal
+ * @since 7.0.1
+ */
+public class UpdateVersionDUnitTest extends DistributedTestCase {
+
+ protected static final String regionName = "testRegion";
+ protected static Cache cache;
+ private static Set<ExpectedException>expectedExceptions = new HashSet<ExpectedException>();
+
+
+
+ public UpdateVersionDUnitTest(String name) {
+ super(name);
+ }
+
+ public void tearDown2() throws Exception {
+ super.tearDown2();
+ closeCache();
+ invokeInEveryVM(new SerializableRunnable() { public void run() {
+ closeCache();
+ } });
+ }
+
+ public void testUpdateVersionAfterCreateWithSerialSender() {
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0); // server1 site1
+ VM vm1 = host.getVM(1); // server2 site1
+
+ VM vm2 = host.getVM(2); // server1 site2
+ VM vm3 = host.getVM(3); // server2 site2
+
+ final String key = "key-1";
+
+ // Site 1
+ Integer lnPort = (Integer)vm0.invoke(UpdateVersionDUnitTest.class, "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm0.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { lnPort});
+ vm0.invoke(UpdateVersionDUnitTest.class, "createSender", new Object[] { "ln1", 2, false, 10, 1, false, false, null, true });
+
+ vm0.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "ln1", 1, 1});
+ vm0.invoke(UpdateVersionDUnitTest.class, "startSender", new Object[] { "ln1" });
+ vm0.invoke(UpdateVersionDUnitTest.class, "waitForSenderRunningState", new Object[] { "ln1" });
+
+ //Site 2
+ Integer nyPort = (Integer)vm2.invoke(UpdateVersionDUnitTest.class, "createFirstRemoteLocator", new Object[] { 2, lnPort });
+ Integer nyRecPort = (Integer) vm2.invoke(UpdateVersionDUnitTest.class, "createReceiver", new Object[] { nyPort });
+
+ vm2.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+ vm3.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { nyPort});
+ vm3.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+
+ final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") {
+
+ @Override
+ public Object call() throws CacheException {
+ Cache cache = CacheFactory.getAnyInstance();
+ Region region = cache.getRegion(regionName);
+ assertTrue(region instanceof PartitionedRegion);
+
+ region.put(key, "value-1");
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+
+ // Create a duplicate entry version tag from stamp with newer
+ // time-stamp.
+ VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+ VersionTag tag = VersionTag.create(memberId);
+
+ int entryVersion = stamp.getEntryVersion()-1;
+ int dsid = stamp.getDistributedSystemId();
+ long time = System.currentTimeMillis();
+
+ tag.setEntryVersion(entryVersion);
+ tag.setDistributedSystemId(dsid);
+ tag.setVersionTimeStamp(time);
+ tag.setIsRemoteForTesting();
+
+ EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+ entry.getKey(), "value-2");
+
+ ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+ // Verify the new stamp
+ entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ stamp = regionEntry.getVersionStamp();
+ assertEquals(
+ "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
+ time, stamp.getVersionTimeStamp());
+ assertEquals(++entryVersion, stamp.getEntryVersion());
+ assertEquals(dsid, stamp.getDistributedSystemId());
+
+ return stamp.asVersionTag();
+ }
+ });
+
+ VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
+
+ @Override
+ public Object call() throws Exception {
+
+ Cache cache = CacheFactory.getAnyInstance();
+ final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+
+ // wait for entry to be received
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ Entry<?,?> entry = null;
+ try {
+ entry = region.getDataStore().getEntryLocally(0, key, false, false, false);
+ } catch (EntryNotFoundException e) {
+ // expected
+ } catch (ForceReattemptException e) {
+ // expected
+ } catch (PRLocallyDestroyedException e) {
+ throw new RuntimeException("unexpected exception", e);
+ }
+ if (entry != null) {
+ getLogWriter().info("found entry " + entry);
+ }
+ return (entry != null);
+ }
+
+ public String description() {
+ return "Expected "+key+" to be received on remote WAN site";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+ wc = new WaitCriterion() {
+ public boolean done() {
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
+ }
+ public String description() {
+ return "waiting for timestamp to be updated";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+ Entry entry = region.getEntry(key);
+ assertTrue("entry class is wrong: " + entry, entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+
+ return stamp.asVersionTag();
+ }
+ });
+
+ assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+ }
+
+ public void testUpdateVersionAfterCreateWithSerialSenderOnDR() {
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0); // server1 site1
+ VM vm1 = host.getVM(1); // server2 site1
+
+ VM vm2 = host.getVM(2); // server1 site2
+ VM vm3 = host.getVM(3); // server2 site2
+
+ final String key = "key-1";
+
+ // Site 1
+ Integer lnPort = (Integer)vm0.invoke(UpdateVersionDUnitTest.class, "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ vm0.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { lnPort});
+ vm0.invoke(UpdateVersionDUnitTest.class, "createSender", new Object[] { "ln1", 2, false, 10, 1, false, false, null, true });
+
+ vm0.invoke(UpdateVersionDUnitTest.class, "createReplicatedRegion", new Object[] {regionName, "ln1"});
+ vm0.invoke(UpdateVersionDUnitTest.class, "startSender", new Object[] { "ln1" });
+ vm0.invoke(UpdateVersionDUnitTest.class, "waitForSenderRunningState", new Object[] { "ln1" });
+
+ //Site 2
+ Integer nyPort = (Integer)vm2.invoke(UpdateVersionDUnitTest.class, "createFirstRemoteLocator", new Object[] { 2, lnPort });
+ Integer nyRecPort = (Integer) vm2.invoke(UpdateVersionDUnitTest.class, "createReceiver", new Object[] { nyPort });
+
+ vm2.invoke(UpdateVersionDUnitTest.class, "createReplicatedRegion", new Object[] {regionName, ""});
+ vm3.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { nyPort });
+ vm3.invoke(UpdateVersionDUnitTest.class, "createReplicatedRegion", new Object[] {regionName, ""});
+
+ final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Update a single entry and get its version") {
+
+ @Override
+ public Object call() throws CacheException {
+ Cache cache = CacheFactory.getAnyInstance();
+ Region region = cache.getRegion(regionName);
+ assertTrue(region instanceof DistributedRegion);
+
+ region.put(key, "value-1");
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof NonTXEntry);
+ RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+
+ // Create a duplicate entry version tag from stamp with newer
+ // time-stamp.
+ VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+ VersionTag tag = VersionTag.create(memberId);
+
+ int entryVersion = stamp.getEntryVersion()-1;
+ int dsid = stamp.getDistributedSystemId();
+ long time = System.currentTimeMillis();
+
+ tag.setEntryVersion(entryVersion);
+ tag.setDistributedSystemId(dsid);
+ tag.setVersionTimeStamp(time);
+ tag.setIsRemoteForTesting();
+
+ EntryEventImpl event = createNewEvent((DistributedRegion) region, tag,
+ entry.getKey(), "value-2");
+
+ ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+ // Verify the new stamp
+ entry = region.getEntry(key);
+ assertTrue(entry instanceof NonTXEntry);
+ regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+ stamp = regionEntry.getVersionStamp();
+ assertEquals(
+ "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
+ time, stamp.getVersionTimeStamp());
+ assertEquals(entryVersion+1, stamp.getEntryVersion());
+ assertEquals(dsid, stamp.getDistributedSystemId());
+
+ return stamp.asVersionTag();
+ }
+ });
+
+ VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
+
+ @Override
+ public Object call() throws Exception {
+
+ Cache cache = CacheFactory.getAnyInstance();
+ final Region region = cache.getRegion(regionName);
+
+ // wait for entry to be received
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ return (region.getEntry(key) != null);
+ }
+
+ public String description() {
+ return "Expected key-1 to be received on remote WAN site";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+ wc = new WaitCriterion() {
+ public boolean done() {
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof NonTXEntry);
+ RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+ return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
+ }
+ public String description() {
+ return "waiting for timestamp to be updated";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof NonTXEntry);
+ RegionEntry regionEntry = ((NonTXEntry) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+
+ return stamp.asVersionTag();
+ }
+ });
+
+ assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+ }
+
+ public void testUpdateVersionAfterCreateWithParallelSender() {
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0); // server1 site1
+ VM vm1 = host.getVM(1); // server2 site1
+
+ VM vm2 = host.getVM(2); // server1 site2
+ VM vm3 = host.getVM(3); // server2 site2
+
+ // Site 1
+ Integer lnPort = (Integer)vm0.invoke(UpdateVersionDUnitTest.class, "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ final String key = "key-1";
+
+ vm0.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { lnPort});
+ vm0.invoke(UpdateVersionDUnitTest.class, "createSender", new Object[] { "ln1", 2, true, 10, 1, false, false, null, true });
+
+ vm0.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "ln1", 1, 1});
+ vm0.invoke(UpdateVersionDUnitTest.class, "startSender", new Object[] { "ln1" });
+ vm0.invoke(UpdateVersionDUnitTest.class, "waitForSenderRunningState", new Object[] { "ln1" });
+
+ //Site 2
+ Integer nyPort = (Integer)vm2.invoke(UpdateVersionDUnitTest.class, "createFirstRemoteLocator", new Object[] { 2, lnPort });
+ Integer nyRecPort = (Integer) vm2.invoke(UpdateVersionDUnitTest.class, "createReceiver", new Object[] { nyPort });
+
+ vm2.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+
+ vm3.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { nyPort});
+ vm3.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+
+ final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") {
+
+ @Override
+ public Object call() throws CacheException {
+ Cache cache = CacheFactory.getAnyInstance();
+ Region region = cache.getRegion(regionName);
+ assertTrue(region instanceof PartitionedRegion);
+
+ region.put(key, "value-1");
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+
+ // Create a duplicate entry version tag from stamp with newer
+ // time-stamp.
+ VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+ VersionTag tag = VersionTag.create(memberId);
+
+ int entryVersion = stamp.getEntryVersion()-1;
+ int dsid = stamp.getDistributedSystemId();
+ long time = System.currentTimeMillis();
+
+ tag.setEntryVersion(entryVersion);
+ tag.setDistributedSystemId(dsid);
+ tag.setVersionTimeStamp(time);
+ tag.setIsRemoteForTesting();
+
+ EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+ entry.getKey(), "value-2");
+
+ ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+ // Verify the new stamp
+ entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ stamp = regionEntry.getVersionStamp();
+ assertEquals(
+ "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
+ time, stamp.getVersionTimeStamp());
+ assertEquals(++entryVersion, stamp.getEntryVersion());
+ assertEquals(dsid, stamp.getDistributedSystemId());
+
+ return stamp.asVersionTag();
+ }
+ });
+
+ VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
+
+ @Override
+ public Object call() throws Exception {
+
+ Cache cache = CacheFactory.getAnyInstance();
+ final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+
+ // wait for entry to be received
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ Entry<?,?> entry = null;
+ try {
+ entry = region.getDataStore().getEntryLocally(0, key, false, false, false);
+ } catch (EntryNotFoundException e) {
+ // expected
+ } catch (ForceReattemptException e) {
+ // expected
+ } catch (PRLocallyDestroyedException e) {
+ throw new RuntimeException("unexpected exception", e);
+ }
+ if (entry != null) {
+ getLogWriter().info("found entry " + entry);
+ }
+ return (entry != null);
+ }
+
+ public String description() {
+ return "Expected key-1 to be received on remote WAN site";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+ wc = new WaitCriterion() {
+ public boolean done() {
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
+ }
+ public String description() {
+ return "waiting for timestamp to be updated";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+
+ return stamp.asVersionTag();
+ }
+ });
+
+ assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+ }
+
+ public void testUpdateVersionAfterCreateWithConcurrentSerialSender() {
+
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0); // server1 site1
+ VM vm1 = host.getVM(1); // server2 site1
+
+ VM vm2 = host.getVM(2); // server1 site2
+ VM vm3 = host.getVM(3); // server2 site2
+
+ // Site 1
+ Integer lnPort = (Integer)vm0.invoke(UpdateVersionDUnitTest.class, "createFirstLocatorWithDSId", new Object[] { 1 });
+
+ final String key = "key-1";
+
+ vm0.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { lnPort });
+ vm0.invoke(UpdateVersionDUnitTest.class, "createConcurrentSender", new Object[] { "ln1", 2, false, 10, 2, false, false, null, true, 2 });
+
+ vm0.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "ln1", 1, 1});
+ vm0.invoke(UpdateVersionDUnitTest.class, "startSender", new Object[] { "ln1" });
+ vm0.invoke(UpdateVersionDUnitTest.class, "waitForSenderRunningState", new Object[] { "ln1" });
+
+ //Site 2
+ Integer nyPort = (Integer)vm2.invoke(UpdateVersionDUnitTest.class, "createFirstRemoteLocator", new Object[] { 2, lnPort });
+ Integer nyRecPort = (Integer) vm2.invoke(UpdateVersionDUnitTest.class, "createReceiver", new Object[] { nyPort });
+
+ vm2.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+
+ vm3.invoke(UpdateVersionDUnitTest.class, "createCache", new Object[] { nyPort });
+ vm3.invoke(UpdateVersionDUnitTest.class, "createPartitionedRegion", new Object[] {regionName, "", 1, 1});
+
+ final VersionTag tag = (VersionTag) vm0.invoke(new SerializableCallable("Put a single entry and get its version") {
+
+ @Override
+ public Object call() throws CacheException {
+ Cache cache = CacheFactory.getAnyInstance();
+ Region region = cache.getRegion(regionName);
+ assertTrue(region instanceof PartitionedRegion);
+
+ region.put(key, "value-1");
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+
+ // Create a duplicate entry version tag from stamp with newer
+ // time-stamp.
+ VersionSource memberId = (VersionSource) cache.getDistributedSystem().getDistributedMember();
+ VersionTag tag = VersionTag.create(memberId);
+
+ int entryVersion = stamp.getEntryVersion()-1;
+ int dsid = stamp.getDistributedSystemId();
+ long time = System.currentTimeMillis();
+
+ tag.setEntryVersion(entryVersion);
+ tag.setDistributedSystemId(dsid);
+ tag.setVersionTimeStamp(time);
+ tag.setIsRemoteForTesting();
+
+ EntryEventImpl event = createNewEvent((PartitionedRegion) region, tag,
+ entry.getKey(), "value-2");
+
+ ((LocalRegion) region).basicUpdate(event, false, true, 0L, false);
+
+ // Verify the new stamp
+ entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ stamp = regionEntry.getVersionStamp();
+ assertEquals(
+ "Time stamp did NOT get updated by UPDATE_VERSION operation on LocalRegion",
+ time, stamp.getVersionTimeStamp());
+ assertEquals(++entryVersion, stamp.getEntryVersion());
+ assertEquals(dsid, stamp.getDistributedSystemId());
+
+ return stamp.asVersionTag();
+ }
+ });
+
+ VersionTag remoteTag = (VersionTag) vm3.invoke(new SerializableCallable("Get timestamp from remote site") {
+
+ @Override
+ public Object call() throws Exception {
+
+ Cache cache = CacheFactory.getAnyInstance();
+ final PartitionedRegion region = (PartitionedRegion)cache.getRegion(regionName);
+
+ // wait for entry to be received
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ Entry<?,?> entry = null;
+ try {
+ entry = region.getDataStore().getEntryLocally(0, key, false, false, false);
+ } catch (EntryNotFoundException e) {
+ // expected
+ } catch (ForceReattemptException e) {
+ // expected
+ } catch (PRLocallyDestroyedException e) {
+ throw new RuntimeException("unexpected exception", e);
+ }
+ if (entry != null) {
+ getLogWriter().info("found entry " + entry);
+ }
+ return (entry != null);
+ }
+
+ public String description() {
+ return "Expected key-1 to be received on remote WAN site";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+ wc = new WaitCriterion() {
+ public boolean done() {
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+ return regionEntry.getVersionStamp().getVersionTimeStamp() == tag.getVersionTimeStamp();
+ }
+ public String description() {
+ return "waiting for timestamp to be updated";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30000, 500, true);
+
+ Entry entry = region.getEntry(key);
+ assertTrue(entry instanceof EntrySnapshot);
+ RegionEntry regionEntry = ((EntrySnapshot) entry).getRegionEntry();
+
+ VersionStamp stamp = regionEntry.getVersionStamp();
+
+ return stamp.asVersionTag();
+ }
+ });
+
+ assertEquals("Local and remote site have different timestamps", tag.getVersionTimeStamp(), remoteTag.getVersionTimeStamp());
+ }
+
+
+ private EntryEventImpl createNewEvent(LocalRegion region, VersionTag tag, Object key, Object value) {
+ EntryEventImpl updateEvent = EntryEventImpl.createVersionTagHolder(tag);
+ updateEvent.setOperation(Operation.UPDATE);
+ updateEvent.setRegion(region);
+ if (region instanceof PartitionedRegion) {
+ updateEvent.setKeyInfo(((PartitionedRegion)region).getKeyInfo(key));
+ } else {
+ updateEvent.setKeyInfo(new KeyInfo(key, value, null));
+ }
+ updateEvent.setNewValue(value);
+ updateEvent.setGenerateCallbacks(true);
+ updateEvent.distributedMember = region.getSystem().getDistributedMember();
+ updateEvent.setNewEventId(region.getSystem());
+ return updateEvent;
+ }
+
+ /*
+ * Helper Methods
+ */
+
+ private static void createCache(Integer locPort) {
+ UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(testName);
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + "]");
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME, getDUnitLogLevel());
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ props.setProperty(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ ExpectedException ex = new ExpectedException("could not get remote locator information for remote site");
+ cache.getLogger().info(ex.getAddString());
+ expectedExceptions.add(ex);
+ ex = new ExpectedException("Pool ln1 is not available");
+ cache.getLogger().info(ex.getAddString());
+ expectedExceptions.add(ex);
+ }
+
+ private static void closeCache() {
+ if (cache != null && !cache.isClosed()) {
+ for (ExpectedException expectedException: expectedExceptions) {
+ cache.getLogger().info(expectedException.getRemoveString());
+ }
+ expectedExceptions.clear();
+ cache.getDistributedSystem().disconnect();
+ cache.close();
+ }
+ cache = null;
+ }
+
+ public static void createSender(String dsName, int remoteDsId,
+ boolean isParallel, Integer maxMemory, Integer batchSize,
+ boolean isConflation, boolean isPersistent, GatewayEventFilter filter,
+ boolean isManualStart) {
+ File persistentDirectory = new File(dsName + "_disk_"
+ + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File[] dirs1 = new File[] { persistentDirectory };
+ if (isParallel) {
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setParallel(true);
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManualStart);
+ ((InternalGatewaySenderFactory) gateway)
+ .setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+ .getName());
+ } else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.create(dsName, remoteDsId);
+
+ } else {
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManualStart);
+ ((InternalGatewaySenderFactory) gateway)
+ .setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ if (isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+ .getName());
+ } else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.create(dsName, remoteDsId);
+ }
+ }
+
+
+ public static void createPartitionedRegion(String regionName, String senderIds, Integer redundantCopies, Integer totalNumBuckets){
+ AttributesFactory fact = new AttributesFactory();
+ if(senderIds!= null){
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()){
+ String senderId = tokenizer.nextToken();
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ PartitionAttributesFactory pfact = new PartitionAttributesFactory();
+ pfact.setTotalNumBuckets(totalNumBuckets);
+ pfact.setRedundantCopies(redundantCopies);
+ pfact.setRecoveryDelay(0);
+ fact.setPartitionAttributes(pfact.create());
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ }
+
+ public static void createReplicatedRegion(String regionName, String senderIds){
+ AttributesFactory fact = new AttributesFactory();
+ if(senderIds!= null){
+ StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+ while (tokenizer.hasMoreTokens()){
+ String senderId = tokenizer.nextToken();
+// GatewaySender sender = cache.getGatewaySender(senderId);
+// assertNotNull(sender);
+ fact.addGatewaySenderId(senderId);
+ }
+ }
+ fact.setDataPolicy(DataPolicy.REPLICATE);
+ fact.setScope(Scope.DISTRIBUTED_ACK);
+ Region r = cache.createRegionFactory(fact.create()).create(regionName);
+ assertNotNull(r);
+ }
+
+ public static void waitForSenderRunningState(String senderId){
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ final GatewaySender sender = getGatewaySenderById(senders, senderId);
+
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ if (sender != null && sender.isRunning()) {
+ return true;
+ }
+ return false;
+ }
+
+ public String description() {
+ return "Expected sender isRunning state to be true but is false";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 300000, 500, true);
+ }
+
+ public static Integer createFirstRemoteLocator(int dsId, int remoteLocPort) {
+ UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(testName);
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+ props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+ props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+ props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, "localhost[" + remoteLocPort + "]");
+ props.setProperty(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ test.getSystem(props);
+ return port;
+ }
+
+ public static void createConcurrentSender(String dsName, int remoteDsId,
+ boolean isParallel, Integer maxMemory,
+ Integer batchSize, boolean isConflation, boolean isPersistent,
+ GatewayEventFilter filter, boolean isManulaStart, int concurrencyLevel) {
+ File persistentDirectory = new File(dsName +"_disk_"+System.currentTimeMillis()+"_" + VM.getCurrentVMNum());
+ persistentDirectory.mkdir();
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ File [] dirs1 = new File[] {persistentDirectory};
+
+ if(isParallel) {
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setParallel(true);
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManulaStart);
+ ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ if(isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+ }
+ else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ gateway.create(dsName, remoteDsId);
+
+ }else {
+ GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
+ gateway.setMaximumQueueMemory(maxMemory);
+ gateway.setBatchSize(batchSize);
+ gateway.setManualStart(isManulaStart);
+ ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
+ if (filter != null) {
+ gateway.addGatewayEventFilter(filter);
+ }
+ gateway.setBatchConflationEnabled(isConflation);
+ if(isPersistent) {
+ gateway.setPersistenceEnabled(true);
+ gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
+ }
+ else {
+ DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+ gateway.setDiskStoreName(store.getName());
+ }
+ gateway.setDispatcherThreads(concurrencyLevel);
+ gateway.create(dsName, remoteDsId);
+ }
+ }
+
+ public static int createReceiver(int locPort) {
+ UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(testName);
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+ + "]");
+
+ InternalDistributedSystem ds = test.getSystem(props);
+ cache = CacheFactory.create(ds);
+ GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ fact.setStartPort(port);
+ fact.setEndPort(port);
+ GatewayReceiver receiver = fact.create();
+ try {
+ receiver.start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Test " + test.getName() + " failed to start GatewayRecevier on port " + port);
+ }
+ return port;
+ }
+
+ public static void startSender(String senderId){
+ Set<GatewaySender> senders = cache.getGatewaySenders();
+ GatewaySender sender = null;
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ sender = s;
+ break;
+ }
+ }
+ sender.start();
+ }
+
+ protected static class MyLocatorCallback extends
+ LocatorDiscoveryCallbackAdapter {
+
+ private final Set discoveredLocators = new HashSet();
+
+ private final Set removedLocators = new HashSet();
+
+ public synchronized void locatorsDiscovered(List locators) {
+ discoveredLocators.addAll(locators);
+ notifyAll();
+ }
+
+ public synchronized void locatorsRemoved(List locators) {
+ removedLocators.addAll(locators);
+ notifyAll();
+ }
+
+ public boolean waitForDiscovery(InetSocketAddress locator, long time)
+ throws InterruptedException {
+ return waitFor(discoveredLocators, locator, time);
+ }
+
+ public boolean waitForRemove(InetSocketAddress locator, long time)
+ throws InterruptedException {
+ return waitFor(removedLocators, locator, time);
+ }
+
+ private synchronized boolean waitFor(Set set, InetSocketAddress locator,
+ long time) throws InterruptedException {
+ long remaining = time;
+ long endTime = System.currentTimeMillis() + time;
+ while (!set.contains(locator) && remaining >= 0) {
+ wait(remaining);
+ remaining = endTime - System.currentTimeMillis();
+ }
+ return set.contains(locator);
+ }
+
+ public synchronized Set getDiscovered() {
+ return new HashSet(discoveredLocators);
+ }
+
+ public synchronized Set getRemoved() {
+ return new HashSet(removedLocators);
+ }
+ }
+
+ private static GatewaySender getGatewaySenderById(Set<GatewaySender> senders, String senderId) {
+ for(GatewaySender s : senders){
+ if(s.getId().equals(senderId)){
+ return s;
+ }
+ }
+ //if none of the senders matches with the supplied senderid, return null
+ return null;
+ }
+
+ public static Integer createFirstLocatorWithDSId(int dsId) {
+ UpdateVersionDUnitTest test = new UpdateVersionDUnitTest(testName);
+ int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME,"0");
+ props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, ""+dsId);
+ props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + port + "]");
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ props.setProperty(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+ props.setProperty(DistributionConfig.START_LOCATOR_NAME, "localhost[" + port + "],server=true,peer=true,hostname-for-clients=localhost");
+ test.getSystem(props);
+ return port;
+ }
+}
\ No newline at end of file