You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/18 00:11:45 UTC
[1/5] incubator-geode git commit: GEODE-1111 Connection Pooling needs
more tests
Repository: incubator-geode
Updated Branches:
refs/heads/feature/GEODE-1050 f45f0f549 -> 893dc86b9
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
new file mode 100755
index 0000000..41d48aa
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
@@ -0,0 +1,5871 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache;
+
+import static org.junit.runners.MethodSorters.NAME_ASCENDING;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.AssertionFailedError;
+
+import org.junit.FixMethodOrder;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Endpoint;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
+import com.gemstone.gemfire.cache30.TestCacheLoader;
+import com.gemstone.gemfire.cache30.TestCacheWriter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.EntryExpiryTask;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifierStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LocalLogWriter;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.ThreadUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+/**
+ * This class tests the client connection pool in GemFire.
+ * It does so by creating a cache server with a cache and a pre-defined region and
+ * a data loader. The client creates the same region with a pool
+ * (this happens in the controller VM). the client then spins up
+ * 10 different threads and issues gets on keys. The server data loader returns the
+ * data to the client.
+ * Test uses Groboutils TestRunnable objects to achieve multi threading behavior
+ * in the test.
+ *
+ */
+@FixMethodOrder(NAME_ASCENDING)
+public class ConnectionPoolDUnitTest extends CacheTestCase {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The port on which the bridge server was started in this VM */
+ private static int bridgeServerPort;
+
+ protected static int port = 0;
+ protected static int port2 = 0;
+
+ protected static int numberOfAfterInvalidates;
+ protected static int numberOfAfterCreates;
+ protected static int numberOfAfterUpdates;
+
+ protected final static int TYPE_CREATE = 0;
+ protected final static int TYPE_UPDATE = 1;
+ protected final static int TYPE_INVALIDATE = 2;
+ protected final static int TYPE_DESTROY = 3;
+
+ public ConnectionPoolDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ // avoid IllegalStateException from HandShake by connecting all vms to
+ // system before creating pool
+ getSystem();
+ Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
+ public void run() {
+ getSystem();
+ }
+ });
+ }
+
+ @Override
+ protected final void postTearDownCacheTestCase() throws Exception {
+ Invoke.invokeInEveryVM(new SerializableRunnable() {
+ public void run() {
+ Map pools = PoolManager.getAll();
+ if (!pools.isEmpty()) {
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().warning("found pools remaining after teardown: " + pools);
+ assertEquals(0, pools.size());
+ }
+ }
+ });
+ postTearDownConnectionPoolDUnitTest();
+ }
+
+ protected void postTearDownConnectionPoolDUnitTest() throws Exception {
+ }
+
+ protected/*GemStoneAddition*/ static PoolImpl getPool(Region r) {
+ PoolImpl result = null;
+ String poolName = r.getAttributes().getPoolName();
+ if (poolName != null) {
+ result = (PoolImpl)PoolManager.find(poolName);
+ }
+ return result;
+ }
+ protected static TestCacheWriter getTestWriter(Region r) {
+ return (TestCacheWriter)r.getAttributes().getCacheWriter();
+ }
+ /**
+ * Create a bridge server on the given port without starting it.
+ *
+ * @since 5.0.2
+ */
+ protected void createBridgeServer(int port) throws IOException {
+ CacheServer bridge = getCache().addCacheServer();
+ bridge.setPort(port);
+ bridge.setMaxThreads(getMaxThreads());
+ bridgeServerPort = bridge.getPort();
+ }
+
+ /**
+ * Starts a bridge server on the given port, using the given
+ * deserializeValues and notifyBySubscription to serve up the
+ * given region.
+ *
+ * @since 4.0
+ */
+ protected void startBridgeServer(int port)
+ throws IOException {
+ startBridgeServer(port, -1);
+ }
+
+ protected void startBridgeServer(int port, int socketBufferSize) throws IOException {
+ startBridgeServer(port, socketBufferSize, CacheServer.DEFAULT_LOAD_POLL_INTERVAL);
+ }
+
+ protected void startBridgeServer(int port, int socketBufferSize, long loadPollInterval)
+ throws IOException {
+
+ Cache cache = getCache();
+ CacheServer bridge = cache.addCacheServer();
+ bridge.setPort(port);
+ if (socketBufferSize != -1) {
+ bridge.setSocketBufferSize(socketBufferSize);
+ }
+ bridge.setMaxThreads(getMaxThreads());
+ bridge.setLoadPollInterval(loadPollInterval);
+ bridge.start();
+ bridgeServerPort = bridge.getPort();
+ }
+
+ /**
+ * By default return 0 which turns off selector and gives thread per cnx.
+ * Test subclasses can override to run with selector.
+ * @since 5.1
+ */
+ protected int getMaxThreads() {
+ return 0;
+ }
+
+ /**
+ * Stops the bridge server that serves up the given cache.
+ *
+ * @since 4.0
+ */
+ void stopBridgeServer(Cache cache) {
+ CacheServer bridge =
+ cache.getCacheServers().iterator().next();
+ bridge.stop();
+ assertFalse(bridge.isRunning());
+ }
+
+ void stopBridgeServers(Cache cache) {
+ CacheServer bridge = null;
+ for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) {
+ bridge = (CacheServer) bsI.next();
+ bridge.stop();
+ assertFalse(bridge.isRunning());
+ }
+ }
+
+ private void restartBridgeServers(Cache cache) throws IOException
+ {
+ CacheServer bridge = null;
+ for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) {
+ bridge = (CacheServer) bsI.next();
+ bridge.start();
+ assertTrue(bridge.isRunning());
+ }
+ }
+
+ protected InternalDistributedSystem createLonerDS() {
+ disconnectFromDS();
+ InternalDistributedSystem ds = getLonerSystem();
+ assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size());
+ return ds;
+ }
+
+
+
+ /**
+ * Returns region attributes for a <code>LOCAL</code> region
+ */
+ protected RegionAttributes getRegionAttributes() {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+ return factory.create();
+ }
+
+ private static String createBridgeClientConnection(String host, int[] ports) {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < ports.length; i++) {
+ if (i > 0) sb.append(",");
+ sb.append("name" + i + "=");
+ sb.append(host + ":" + ports[i]);
+ }
+ return sb.toString();
+ }
+
+ private class EventWrapper {
+ public final EntryEvent event;
+ public final Object key;
+ public final Object val;
+ public final Object arg;
+ public final int type;
+ public EventWrapper(EntryEvent ee, int type) {
+ this.event = ee;
+ this.key = ee.getKey();
+ this.val = ee.getNewValue();
+ this.arg = ee.getCallbackArgument();
+ this.type = type;
+ }
+ public String toString() {
+ return "EventWrapper: event=" + event + ", type=" + type;
+ }
+ }
+
+ protected class ControlListener extends CacheListenerAdapter {
+ public final LinkedList events = new LinkedList();
+ public final Object CONTROL_LOCK = new Object();
+
+ public boolean waitWhileNotEnoughEvents(long sleepMs, int eventCount) {
+ long maxMillis = System.currentTimeMillis() + sleepMs;
+ synchronized (this.CONTROL_LOCK) {
+ try {
+ while (this.events.size() < eventCount) {
+ long waitMillis = maxMillis - System.currentTimeMillis();
+ if (waitMillis < 10) {
+ break;
+ }
+ this.CONTROL_LOCK.wait(waitMillis);
+ }
+ } catch (InterruptedException abort) {
+ fail("interrupted");
+ }
+ return !this.events.isEmpty();
+ }
+ }
+
+ public void afterCreate(EntryEvent e) {
+ //System.out.println("afterCreate: " + e);
+ synchronized(this.CONTROL_LOCK) {
+ this.events.add(new EventWrapper(e, TYPE_CREATE));
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+
+ public void afterUpdate(EntryEvent e) {
+ //System.out.println("afterUpdate: " + e);
+ synchronized(this.CONTROL_LOCK) {
+ this.events.add(new EventWrapper(e, TYPE_UPDATE));
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+
+ public void afterInvalidate(EntryEvent e) {
+ //System.out.println("afterInvalidate: " + e);
+ synchronized(this.CONTROL_LOCK) {
+ this.events.add(new EventWrapper(e, TYPE_INVALIDATE));
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+
+ public void afterDestroy(EntryEvent e) {
+ //System.out.println("afterDestroy: " + e);
+ synchronized(this.CONTROL_LOCK) {
+ this.events.add(new EventWrapper(e, TYPE_DESTROY));
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+ }
+
+
+
+
+ /**
+ * Create a fake EntryEvent that returns the provided region for {@link CacheEvent#getRegion()}
+ * and returns {@link com.gemstone.gemfire.cache.Operation#LOCAL_LOAD_CREATE} for {@link CacheEvent#getOperation()}
+ * @param r
+ * @return fake entry event
+ */
+ protected static EntryEvent createFakeyEntryEvent(final Region r) {
+ return new EntryEvent() {
+ public Operation getOperation()
+ {
+ return Operation.LOCAL_LOAD_CREATE; // fake out pool to exit early
+ }
+ public Region getRegion()
+ {
+ return r;
+ }
+ public Object getKey() { return null; }
+ public Object getOldValue() { return null;}
+ public boolean isOldValueAvailable() {return true;}
+ public Object getNewValue() { return null;}
+ public boolean isLocalLoad() { return false;}
+ public boolean isNetLoad() {return false;}
+ public boolean isLoad() {return true; }
+ public boolean isNetSearch() {return false;}
+ public TransactionId getTransactionId() {return null;}
+ public Object getCallbackArgument() {return null;}
+ public boolean isCallbackArgumentAvailable() {return true;}
+ public boolean isOriginRemote() {return false;}
+ public DistributedMember getDistributedMember() {return null;}
+ public boolean isExpiration() { return false;}
+ public boolean isDistributed() { return false;}
+ public boolean isBridgeEvent() {
+ return hasClientOrigin();
+ }
+ public boolean hasClientOrigin() {
+ return false;
+ }
+ public ClientProxyMembershipID getContext() {
+ return null;
+ }
+ public SerializedCacheValue getSerializedOldValue() {return null;}
+ public SerializedCacheValue getSerializedNewValue() {return null;}
+ };
+ }
+
+ public void verifyBalanced(final PoolImpl pool, int expectedServer,
+ final int expectedConsPerServer) {
+ verifyServerCount(pool, expectedServer);
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return balanced(pool, expectedConsPerServer);
+ }
+ public String description() {
+ return "expected " + expectedConsPerServer
+ + " but endpoints=" + outOfBalanceReport(pool);
+ }
+ };
+ Wait.waitForCriterion(ev, 2 * 60 * 1000, 200, true);
+ assertEquals("expected " + expectedConsPerServer
+ + " but endpoints=" + outOfBalanceReport(pool),
+ true, balanced(pool, expectedConsPerServer));
+ }
+ protected boolean balanced(PoolImpl pool, int expectedConsPerServer) {
+ Iterator it = pool.getEndpointMap().values().iterator();
+ while (it.hasNext()) {
+ Endpoint ep = (Endpoint)it.next();
+ if (ep.getStats().getConnections() != expectedConsPerServer) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected String outOfBalanceReport(PoolImpl pool) {
+ StringBuffer result = new StringBuffer();
+ Iterator it = pool.getEndpointMap().values().iterator();
+ result.append("<");
+ while (it.hasNext()) {
+ Endpoint ep = (Endpoint)it.next();
+ result.append("ep=" + ep);
+ result.append(" conCount=" + ep.getStats().getConnections());
+ if (it.hasNext()) {
+ result.append(", ");
+ }
+ }
+ result.append(">");
+ return result.toString();
+ }
+
+ public void waitForBlacklistToClear(final PoolImpl pool) {
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return pool.getBlacklistedServers().size() == 0;
+ }
+ public String description() {
+ return null;
+ }
+ };
+ Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+ assertEquals("unexpected blacklistedServers=" + pool.getBlacklistedServers(),
+ 0, pool.getBlacklistedServers().size());
+ }
+
+ public void verifyServerCount(final PoolImpl pool, final int expectedCount) {
+ getCache().getLogger().info("verifyServerCount expects=" + expectedCount);
+ WaitCriterion ev = new WaitCriterion() {
+ String excuse;
+ public boolean done() {
+ int actual = pool.getConnectedServerCount();
+ if (actual == expectedCount) {
+ return true;
+ }
+ excuse = "Found only " + actual + " servers, expected " + expectedCount;
+ return false;
+ }
+ public String description() {
+ return excuse;
+ }
+ };
+ Wait.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
+ }
+
+ /**
+ * Tests that the callback argument is sent to the server
+ */
+ public void test001CallbackArg() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ final Object createCallbackArg = "CREATE CALLBACK ARG";
+ final Object updateCallbackArg = "PUT CALLBACK ARG";
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+
+ CacheWriter cw = new TestCacheWriter() {
+ public final void beforeUpdate2(EntryEvent event)
+ throws CacheWriterException {
+ Object beca = event.getCallbackArgument();
+ assertEquals(updateCallbackArg, beca);
+ }
+
+ public void beforeCreate2(EntryEvent event)
+ throws CacheWriterException {
+ Object beca = event.getCallbackArgument();
+ assertEquals(createCallbackArg, beca);
+ }
+ };
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+
+ ClientServerTestCase.configureConnectionPool(factory,NetworkUtils.getServerHostName(host),port,-1,true,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(create);
+ vm1.invoke(new CacheSerializableRunnable("Add entries") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.create(new Integer(i), "old" + i, createCallbackArg);
+ }
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "new" + i, updateCallbackArg);
+ }
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ TestCacheWriter writer = getTestWriter(region);
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+
+ }
+
+ /**
+ * Tests that consecutive puts have the callback assigned
+ * appropriately.
+ */
+ public void test002CallbackArg2() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ final Object createCallbackArg = "CREATE CALLBACK ARG";
+// final Object updateCallbackArg = "PUT CALLBACK ARG";
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ CacheWriter cw = new TestCacheWriter() {
+ public void beforeCreate2(EntryEvent event)
+ throws CacheWriterException {
+ Integer key = (Integer) event.getKey();
+ if (key.intValue() % 2 == 0) {
+ Object beca = event.getCallbackArgument();
+ assertEquals(createCallbackArg, beca);
+ } else {
+ Object beca = event.getCallbackArgument();
+ assertNull(beca);
+ }
+ }
+ };
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(create);
+ vm1.invoke(new CacheSerializableRunnable("Add entries") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ if (i % 2 == 0) {
+ region.create(new Integer(i), "old" + i, createCallbackArg);
+
+ } else {
+ region.create(new Integer(i), "old" + i);
+ }
+ }
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+
+ vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ TestCacheWriter writer = getTestWriter(region);
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+
+
+ /**
+ * Tests for bug 36684 by having two bridge servers with cacheloaders that should always return
+ * a value and one client connected to each server reading values. If the bug exists, the
+ * clients will get null sometimes.
+ * @throws InterruptedException
+ */
+ public void test003Bug36684() throws CacheException, InterruptedException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ // Create the cache servers with distributed, mirrored region
+ SerializableRunnable createServer =
+ new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ CacheLoader cl = new CacheLoader() {
+ public Object load(LoaderHelper helper) {
+ return helper.getKey();
+ }
+ public void close() {
+
+ }
+ };
+ AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ };
+ getSystem().getLogWriter().info("before create server");
+ vm0.invoke(createServer);
+ vm1.invoke(createServer);
+
+ // Create cache server clients
+ final int numberOfKeys = 1000;
+ final String host0 = NetworkUtils.getServerHostName(host);
+ final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ SerializableRunnable createClient =
+ new CacheSerializableRunnable("Create Cache Server Client") {
+ public void run2() throws CacheException {
+ // reset all static listener variables in case this is being rerun in a subclass
+ numberOfAfterInvalidates = 0;
+ numberOfAfterCreates = 0;
+ numberOfAfterUpdates = 0;
+ // create the region
+ getLonerSystem();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+ // create bridge writer
+ ClientServerTestCase.configureConnectionPool(factory,host0,vm0Port,vm1Port,true,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+ getSystem().getLogWriter().info("before create client");
+ vm2.invoke(createClient);
+ vm3.invoke(createClient);
+
+ // Initialize each client with entries (so that afterInvalidate is called)
+ SerializableRunnable initializeClient =
+ new CacheSerializableRunnable("Initialize Client") {
+ public void run2() throws CacheException {
+// StringBuffer errors = new StringBuffer();
+ numberOfAfterInvalidates = 0;
+ numberOfAfterCreates = 0;
+ numberOfAfterUpdates = 0;
+ LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
+ for (int i=0; i<numberOfKeys; i++) {
+ String expected = "key-"+i;
+ String actual = (String) region.get("key-"+i);
+ assertEquals(expected, actual);
+ }
+ }
+ };
+
+ getSystem().getLogWriter().info("before initialize client");
+ AsyncInvocation inv2 = vm2.invokeAsync(initializeClient);
+ AsyncInvocation inv3 = vm3.invokeAsync(initializeClient);
+
+ ThreadUtils.join(inv2, 30 * 1000);
+ ThreadUtils.join(inv3, 30 * 1000);
+
+ if (inv2.exceptionOccurred()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm2", inv2.getException());
+ }
+ if(inv3.exceptionOccurred()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm3", inv3.getException());
+ }
+ }
+
+
+ /**
+ * Test for client connection loss with CacheLoader Exception on the server.
+ */
+ public void test004ForCacheLoaderException() throws CacheException, InterruptedException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM server = host.getVM(0);
+ VM client = host.getVM(1);
+
+ // Create the cache servers with distributed, mirrored region
+ SerializableRunnable createServer =
+ new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ CacheLoader cl = new CacheLoader() {
+ public Object load(LoaderHelper helper) {
+ System.out.println("### CALLING CACHE LOADER....");
+ throw new CacheLoaderException("Test for CahceLoaderException causing Client connection to disconnect.");
+ }
+ public void close() {
+ }
+ };
+ AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ };
+ getSystem().getLogWriter().info("before create server");
+
+ server.invoke(createServer);
+
+ // Create cache server clients
+ final int numberOfKeys = 10;
+ final String host0 = NetworkUtils.getServerHostName(host);
+ final int[] port = new int[] {server.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort())};
+ final String poolName = "myPool";
+
+ SerializableRunnable createClient =
+ new CacheSerializableRunnable("Create Cache Server Client") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ // create bridge writer
+ ClientServerTestCase.configureConnectionPoolWithName(factory,host0,port,true,-1, -1, null, poolName);
+ createRegion(name, factory.create());
+ }
+ };
+ getSystem().getLogWriter().info("before create client");
+ client.invoke(createClient);
+
+ // Initialize each client with entries (so that afterInvalidate is called)
+ SerializableRunnable invokeServerCacheLaoder =
+ new CacheSerializableRunnable("Initialize Client") {
+ public void run2() throws CacheException {
+ LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
+ PoolStats stats = ((PoolImpl)PoolManager.find(poolName)).getStats();
+ int oldConnects = stats.getConnects();
+ int oldDisConnects = stats.getDisConnects();
+ try {
+ for (int i=0; i<numberOfKeys; i++) {
+ String actual = (String) region.get("key-"+i);
+ }
+ } catch (Exception ex){
+ if (!(ex.getCause() instanceof CacheLoaderException)) {
+ fail ("UnExpected Exception, expected to receive CacheLoaderException from server, instead found: " + ex.getCause().getClass());
+ }
+ }
+ int newConnects = stats.getConnects();
+ int newDisConnects = stats.getDisConnects();
+ //System.out.println("#### new connects/disconnects :" + newConnects + ":" + newDisConnects);
+ if (newConnects != oldConnects && newDisConnects != oldDisConnects) {
+ fail ("New connection has created for Server side CacheLoaderException.");
+ }
+ }
+ };
+
+ getSystem().getLogWriter().info("before initialize client");
+ AsyncInvocation inv2 = client.invokeAsync(invokeServerCacheLaoder);
+
+ ThreadUtils.join(inv2, 30 * 1000);
+ SerializableRunnable stopServer = new SerializableRunnable("stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ };
+ server.invoke(stopServer);
+
+ }
+
+
+ protected void validateDS() {
+ List l = InternalDistributedSystem.getExistingSystems();
+ if (l.size() > 1) {
+ getSystem().getLogWriter().info("validateDS: size="
+ + l.size()
+ + " isDedicatedAdminVM="
+ + DistributionManager.isDedicatedAdminVM
+ + " l=" + l);
+ }
+ assertFalse(DistributionManager.isDedicatedAdminVM);
+ assertEquals(1, l.size());
+ }
+
+
+ /**
+ * Tests the basic operations of the {@link Pool}
+ *
+ * @since 3.5
+ */
+ public void test006Pool() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = Host.getHost(0).getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setConcurrencyChecksEnabled(false);
+ factory.setCacheLoader(new CacheLoader() {
+ public Object load(LoaderHelper helper) {
+ //System.err.println("CacheServer data loader called");
+ return helper.getKey().toString();
+ }
+ public void close() {
+
+ }
+ });
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ validateDS();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+ vm1.invoke(create);
+
+ vm1.invoke(new CacheSerializableRunnable("Get values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get(new Integer(i));
+ assertEquals(String.valueOf(i), value);
+ }
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Update values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), new Integer(i));
+ }
+ }
+ });
+
+ vm2.invoke(create);
+ vm2.invoke(new CacheSerializableRunnable("Validate values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get(new Integer(i));
+ assertNotNull(value);
+ assertTrue(value instanceof Integer);
+ assertEquals(i, ((Integer) value).intValue());
+ }
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Close Pool") {
+ // do some special close validation here
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ String pName = region.getAttributes().getPoolName();
+ PoolImpl p = (PoolImpl)PoolManager.find(pName);
+ assertEquals(false, p.isDestroyed());
+ assertEquals(1, p.getAttachCount());
+ try {
+ p.destroy();
+ fail("expected IllegalStateException");
+ } catch (IllegalStateException expected) {
+ }
+ region.localDestroyRegion();
+ assertEquals(false, p.isDestroyed());
+ assertEquals(0, p.getAttachCount());
+ }
+ });
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+
+
+
+
+ /**
+ * Tests the BridgeServer failover (bug 31832).
+ */
+ public void test007BridgeServerFailoverCnx1() throws CacheException {
+ disconnectAllFromDS();
+ basicTestBridgeServerFailover(1);
+ }
+ /**
+ * Test BridgeServer failover with connectionsPerServer set to 0
+ */
+ public void test008BridgeServerFailoverCnx0() throws CacheException {
+ basicTestBridgeServerFailover(0);
+ }
+ private void basicTestBridgeServerFailover(final int cnxCount) throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ // Create two bridge servers
+ SerializableRunnable createCacheServer =
+ new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ };
+
+ vm0.invoke(createCacheServer);
+ vm1.invoke(createCacheServer);
+
+ final int port0 =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ final int port1 =
+ vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+// final String host1 = getServerHostName(vm1.getHost());
+
+ // Create one bridge client in this VM
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,true,-1,cnxCount, null, 100);
+
+ Region region = createRegion(name, factory.create());
+
+ // force connections to form
+ region.put("keyInit", new Integer(0));
+ region.put("keyInit2", new Integer(0));
+ }
+ };
+
+ vm2.invoke(create);
+
+ // Launch async thread that puts objects into cache. This thread will execute until
+ // the test has ended (which is why the RegionDestroyedException and CacheClosedException
+ // are caught and ignored. If any other exception occurs, the test will fail. See
+ // the putAI.exceptionOccurred() assertion below.
+ AsyncInvocation putAI = vm2.invokeAsync(new CacheSerializableRunnable("Put objects") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ try {
+ for (int i=0; i<100000; i++) {
+ region.put("keyAI", new Integer(i));
+ try {Thread.sleep(100);} catch (InterruptedException ie) {
+ fail("interrupted");
+ }
+ }
+ } catch (NoAvailableServersException ignore) {
+ /*ignore*/
+ } catch (RegionDestroyedException e) { //will be thrown when the test ends
+ /*ignore*/
+ }
+ catch (CancelException e) { //will be thrown when the test ends
+ /*ignore*/
+ }
+ }
+ });
+
+
+ SerializableRunnable verify1Server =
+ new CacheSerializableRunnable("verify1Server") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ verifyServerCount(pool, 1);
+ }
+ };
+ SerializableRunnable verify2Servers =
+ new CacheSerializableRunnable("verify2Servers") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ verifyServerCount(pool, 2);
+ }
+ };
+
+ vm2.invoke(verify2Servers);
+
+ SerializableRunnable stopCacheServer =
+ new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ };
+
+ final String expected = "java.io.IOException";
+ final String addExpected =
+ "<ExpectedException action=add>" + expected + "</ExpectedException>";
+ final String removeExpected =
+ "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ LogWriter bgexecLogger =
+ new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+ bgexecLogger.info(addExpected);
+ }
+ });
+ try { // make sure we removeExpected
+
+ // Bounce the non-current server (I know that VM1 contains the non-current server
+ // because ...
+ vm1.invoke(stopCacheServer);
+
+ vm2.invoke(verify1Server);
+
+ final int restartPort = port1;
+ vm1.invoke(
+ new SerializableRunnable("Restart CacheServer") {
+ public void run() {
+ try {
+ Region region = getRootRegion().getSubregion(name);
+ assertNotNull(region);
+ startBridgeServer(restartPort);
+ }
+ catch(Exception e) {
+ getSystem().getLogWriter().fine(new Exception(e));
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e);
+ }
+ }
+ });
+
+ // Pause long enough for the monitor to realize the server has been bounced
+ // and reconnect to it.
+ vm2.invoke(verify2Servers);
+
+ } finally {
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ LogWriter bgexecLogger =
+ new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+ bgexecLogger.info(removeExpected);
+ }
+ });
+ }
+
+ // Stop the other cache server
+ vm0.invoke(stopCacheServer);
+
+ // Run awhile
+ vm2.invoke(verify1Server);
+
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("FIXME: this thread does not terminate"); // FIXME
+// // Verify that no exception has occurred in the putter thread
+// DistributedTestCase.join(putAI, 5 * 60 * 1000, getLogWriter());
+// //assertTrue("Exception occurred while invoking " + putAI, !putAI.exceptionOccurred());
+// if (putAI.exceptionOccurred()) {
+// fail("While putting entries: ", putAI.getException());
+// }
+
+ // Close Pool
+ vm2.invoke(new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ });
+
+ // Stop the last cache server
+ vm1.invoke(stopCacheServer);
+ }
+
+
+ /**
+ * Make sure cnx lifetime expiration working on thread local cnxs.
+ * @author darrel
+ */
+ public void test009LifetimeExpireOnTL() throws CacheException {
+ basicTestLifetimeExpire(true);
+ }
+
+ /**
+ * Make sure cnx lifetime expiration working on thread local cnxs.
+ * @author darrel
+ */
+ public void test010LifetimeExpireOnPoolCnx() throws CacheException {
+ basicTestLifetimeExpire(false);
+ }
+
+ protected static volatile boolean stopTestLifetimeExpire = false;
+
+ protected static volatile int baselineLifetimeCheck;
+ protected static volatile int baselineLifetimeExtensions;
+ protected static volatile int baselineLifetimeConnect;
+ protected static volatile int baselineLifetimeDisconnect;
+
+ private void basicTestLifetimeExpire(final boolean threadLocal) throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ AsyncInvocation putAI = null;
+ AsyncInvocation putAI2 = null;
+
+ try {
+
+ // Create two bridge servers
+ SerializableRunnable createCacheServer =
+ new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ factory.setCacheListener(new DelayListener(25));
+ createRegion(name, factory.create());
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ };
+
+ vm0.invoke(createCacheServer);
+
+ final int port0 =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+ vm1.invoke(createCacheServer);
+ final int port1 =
+ vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ SerializableRunnable stopCacheServer =
+ new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ };
+ // we only had to stop it to reserve a port
+ vm1.invoke(stopCacheServer);
+
+
+ // Create one bridge client in this VM
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,false/*queue*/,-1,0, null, 100, 500, threadLocal, 500);
+
+ Region region = createRegion(name, factory.create());
+
+ // force connections to form
+ region.put("keyInit", new Integer(0));
+ region.put("keyInit2", new Integer(0));
+ }
+ };
+
+ vm2.invoke(create);
+
+ // Launch async thread that puts objects into cache. This thread will execute until
+ // the test has ended.
+ SerializableRunnable putter1 =
+ new CacheSerializableRunnable("Put objects") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ PoolStats stats = pool.getStats();
+ baselineLifetimeCheck = stats.getLoadConditioningCheck();
+ baselineLifetimeExtensions = stats.getLoadConditioningExtensions();
+ baselineLifetimeConnect = stats.getLoadConditioningConnect();
+ baselineLifetimeDisconnect = stats.getLoadConditioningDisconnect();
+ try {
+ int count = 0;
+ while (!stopTestLifetimeExpire) {
+ count++;
+ region.put("keyAI1", new Integer(count));
+ }
+ } catch (NoAvailableServersException ex) {
+ if (stopTestLifetimeExpire) {
+ return;
+ } else {
+ throw ex;
+ }
+ // } catch (RegionDestroyedException e) { //will be thrown when the test ends
+ // /*ignore*/
+ // } catch (CancelException e) { //will be thrown when the test ends
+ // /*ignore*/
+ }
+ }
+ };
+ SerializableRunnable putter2 =
+ new CacheSerializableRunnable("Put objects") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ try {
+ int count = 0;
+ while (!stopTestLifetimeExpire) {
+ count++;
+ region.put("keyAI2", new Integer(count));
+ }
+ } catch (NoAvailableServersException ex) {
+ if (stopTestLifetimeExpire) {
+ return;
+ } else {
+ throw ex;
+ }
+ // } catch (RegionDestroyedException e) { //will be thrown when the test ends
+ // /*ignore*/
+ // } catch (CancelException e) { //will be thrown when the test ends
+ // /*ignore*/
+ }
+ }
+ };
+ putAI = vm2.invokeAsync(putter1);
+ putAI2 = vm2.invokeAsync(putter2);
+
+ SerializableRunnable verify1Server =
+ new CacheSerializableRunnable("verify1Server") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ final PoolStats stats = pool.getStats();
+ verifyServerCount(pool, 1);
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck);
+ }
+ public String description() {
+ return null;
+ }
+ };
+ Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+
+ // make sure no replacements are happening.
+ // since we have 2 threads and 2 cnxs and 1 server
+ // when lifetimes are up we should only want to connect back to the
+ // server we are already connected to and thus just extend our lifetime
+ assertTrue("baselineLifetimeCheck=" + baselineLifetimeCheck
+ + " but stats.getLoadConditioningCheck()=" + stats.getLoadConditioningCheck(),
+ stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
+ baselineLifetimeCheck = stats.getLoadConditioningCheck();
+ assertTrue(stats.getLoadConditioningExtensions() > baselineLifetimeExtensions);
+ assertTrue(stats.getLoadConditioningConnect() == baselineLifetimeConnect);
+ assertTrue(stats.getLoadConditioningDisconnect() == baselineLifetimeDisconnect);
+ }
+ };
+ SerializableRunnable verify2Servers =
+ new CacheSerializableRunnable("verify2Servers") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ final PoolStats stats = pool.getStats();
+ verifyServerCount(pool, 2);
+ // make sure some replacements are happening.
+ // since we have 2 threads and 2 cnxs and 2 servers
+ // when lifetimes are up we should connect to the other server sometimes.
+// int retry = 300;
+// while ((retry-- > 0)
+// && (stats.getLoadConditioningCheck() < (10+baselineLifetimeCheck))) {
+// pause(100);
+// }
+// assertTrue("Bug 39209 expected "
+// + stats.getLoadConditioningCheck()
+// + " to be >= "
+// + (10+baselineLifetimeCheck),
+// stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
+
+ // TODO: does this WaitCriterion actually help?
+ WaitCriterion wc = new WaitCriterion() {
+ String excuse;
+ public boolean done() {
+ int actual = stats.getLoadConditioningCheck();
+ int expected = 10 + baselineLifetimeCheck;
+ if (actual >= expected) {
+ return true;
+ }
+ excuse = "Bug 39209 expected " + actual + " to be >= " + expected;
+ return false;
+ }
+ public String description() {
+ return excuse;
+ }
+ };
+ try {
+ Wait.waitForCriterion(wc, 60 * 1000, 1000, true);
+ } catch (AssertionFailedError e) {
+// dumpStack();
+ throw e;
+ }
+
+ assertTrue(stats.getLoadConditioningConnect() > baselineLifetimeConnect);
+ assertTrue(stats.getLoadConditioningDisconnect() > baselineLifetimeDisconnect);
+ }
+ };
+
+ vm2.invoke(verify1Server);
+ assertEquals(true, putAI.isAlive());
+ assertEquals(true, putAI2.isAlive());
+
+ {
+ final int restartPort = port1;
+ vm1.invoke(new SerializableRunnable("Restart CacheServer") {
+ public void run() {
+ try {
+ Region region = getRootRegion().getSubregion(name);
+ assertNotNull(region);
+ startBridgeServer(restartPort);
+ }
+ catch(Exception e) {
+ getSystem().getLogWriter().fine(new Exception(e));
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e);
+ }
+ }
+ });
+ }
+
+ vm2.invoke(verify2Servers);
+ assertEquals(true, putAI.isAlive());
+ assertEquals(true, putAI2.isAlive());
+ } finally {
+ vm2.invoke(new SerializableRunnable("Stop Putters") {
+ public void run() {
+ stopTestLifetimeExpire = true;
+ }
+ });
+
+ try {
+ if (putAI != null) {
+ // Verify that no exception has occurred in the putter thread
+ ThreadUtils.join(putAI, 30 * 1000);
+ if (putAI.exceptionOccurred()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While putting entries: ", putAI.getException());
+ }
+ }
+
+ if (putAI2 != null) {
+ // Verify that no exception has occurred in the putter thread
+ ThreadUtils.join(putAI, 30 * 1000);
+ // FIXME this thread does not terminate
+// if (putAI2.exceptionOccurred()) {
+// fail("While putting entries: ", putAI.getException());
+// }
+ }
+
+ } finally {
+ vm2.invoke(new SerializableRunnable("Stop Putters") {
+ public void run() {
+ stopTestLifetimeExpire = false;
+ }
+ });
+ // Close Pool
+ vm2.invoke(new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ String poolName = region.getAttributes().getPoolName();
+ region.localDestroyRegion();
+ PoolManager.find(poolName).destroy();
+ }
+ });
+
+ SerializableRunnable stopCacheServer =
+ new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ };
+ vm1.invoke(stopCacheServer);
+ vm0.invoke(stopCacheServer);
+ }
+ }
+ }
+
+ /**
+ * Tests the create operation of the {@link Pool}
+ *
+ * @since 3.5
+ */
+ public void test011PoolCreate() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = Host.getHost(0).getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(create);
+ vm1.invoke(new CacheSerializableRunnable("Create values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.create(new Integer(i), new Integer(i));
+ }
+ }
+ });
+
+ vm2.invoke(create);
+ vm2.invoke(new CacheSerializableRunnable("Validate values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get(new Integer(i));
+ assertNotNull(value);
+ assertTrue(value instanceof Integer);
+ assertEquals(i, ((Integer) value).intValue());
+ }
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+ vm2.invoke(close);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+
+ /**
+ * Tests the put operation of the {@link Pool}
+ *
+ * @since 3.5
+ */
+ public void test012PoolPut() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = Host.getHost(0).getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+ SerializableRunnable createPool =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ // create bridge writer
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(createPool);
+
+ vm1.invoke(new CacheSerializableRunnable("Put values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ // put string values
+ region.put("key-string-"+i, "value-"+i);
+
+ // put object values
+ Order order = new Order();
+ order.init(i);
+ region.put("key-object-"+i, order);
+
+ // put byte[] values
+ region.put("key-bytes-"+i, ("value-"+i).getBytes());
+ }
+ }
+ });
+
+ vm2.invoke(createPool);
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-string-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof String);
+ assertEquals("value-"+i, value);
+ }
+ }
+ });
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-object-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof Order);
+ assertEquals(i, ((Order) value).getIndex());
+ }
+ }
+ });
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-bytes-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof byte[]);
+ assertEquals("value-"+i, new String((byte[]) value));
+ }
+ }
+ });
+
+ SerializableRunnable closePool =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(closePool);
+ vm2.invoke(closePool);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+ /**
+ * Tests the put operation of the {@link Pool}
+ *
+ * @since 3.5
+ */
+ public void test013PoolPutNoDeserialize() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = Host.getHost(0).getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null,null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable createPool =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(createPool);
+
+ vm1.invoke(new CacheSerializableRunnable("Put values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ // put string values
+ region.put("key-string-"+i, "value-"+i);
+
+ // put object values
+ Order order = new Order();
+ order.init(i);
+ region.put("key-object-"+i, order);
+
+ // put byte[] values
+ region.put("key-bytes-"+i, ("value-"+i).getBytes());
+ }
+ }
+ });
+
+ vm2.invoke(createPool);
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-string-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof String);
+ assertEquals("value-"+i, value);
+ }
+ }
+ });
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-object-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof Order);
+ assertEquals(i, ((Order) value).getIndex());
+ }
+ }
+ });
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-bytes-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof byte[]);
+ assertEquals("value-"+i, new String((byte[]) value));
+ }
+ }
+ });
+
+ SerializableRunnable closePool =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(closePool);
+ vm2.invoke(closePool);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ Wait.pause(5 * 1000);
+ }
+
+ /**
+ * Tests that invalidates and destroys are propagated to {@link Pool}s.
+ *
+ * @since 3.5
+ */
+ public void test014InvalidateAndDestroyPropagation() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+
+ vm1.invoke(create);
+ vm1.invoke(new CacheSerializableRunnable("Populate region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "old" + i);
+ }
+ }
+ });
+ vm2.invoke(create);
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ ctl.enableEventHistory();
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("Update region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "new" + i, "callbackArg" + i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForInvalidated(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNotNull(entry);
+ assertNull(entry.getValue());
+ }
+ {
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals("old" + i, ee.getOldValue());
+ assertEquals(Operation.INVALIDATE, ee.getOperation());
+ assertEquals("callbackArg" + i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ }
+ }
+ });
+
+
+ vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("new" + i, region.getEntry(key).getValue());
+ region.destroy(key, "destroyCB"+i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForDestroyed(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry);
+ }
+ {
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals(Operation.DESTROY, ee.getOperation());
+ assertEquals("destroyCB"+i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ }
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("recreate") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ region.create(key, "create" + i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify creates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ List l = ctl.getEventHistory();
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("history (should be empty): " + l);
+ assertEquals(0, l.size());
+ // now see if we can get it from the server
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("create"+i, region.get(key, "loadCB"+i));
+ }
+ l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("processing " + ee);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals("create"+i, ee.getNewValue());
+ assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
+ assertEquals("loadCB"+i, ee.getCallbackArgument());
+ assertEquals(false, ee.isOriginRemote());
+ }
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+ vm2.invoke(close);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+ /**
+ * Tests that invalidates and destroys are propagated to {@link Pool}s
+ * correctly to DataPolicy.EMPTY + InterestPolicy.ALL
+ *
+ * @since 5.0
+ */
+ public void test015InvalidateAndDestroyToEmptyAllPropagation() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable createEmpty =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ factory.setDataPolicy(DataPolicy.EMPTY);
+ factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+ SerializableRunnable createNormal =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+
+ vm1.invoke(createEmpty);
+ vm1.invoke(new CacheSerializableRunnable("Populate region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "old" + i);
+ }
+ }
+ });
+
+ vm2.invoke(createNormal);
+ vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ ctl.enableEventHistory();
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("Update region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "new" + i, "callbackArg" + i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForInvalidated(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry); // we are empty!
+ }
+ {
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals(false, ee.isOldValueAvailable()); // failure
+ assertEquals(Operation.INVALIDATE, ee.getOperation());
+ assertEquals("callbackArg" + i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ }
+
+ }
+ });
+
+
+ vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("new" + i, region.getEntry(key).getValue());
+ region.destroy(key, "destroyCB"+i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForDestroyed(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry);
+ }
+ {
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals(false, ee.isOldValueAvailable());
+ assertEquals(Operation.DESTROY, ee.getOperation());
+ assertEquals("destroyCB"+i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ }
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("recreate") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ region.create(key, "create" + i, "createCB"+i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify creates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForInvalidated(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry);
+ }
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals(false, ee.isOldValueAvailable());
+ assertEquals(Operation.INVALIDATE, ee.getOperation());
+ assertEquals("createCB"+i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ // now see if we can get it from the server
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("create"+i, region.get(key, "loadCB"+i));
+ }
+ l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals("create"+i, ee.getNewValue());
+ assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
+ assertEquals("loadCB"+i, ee.getCallbackArgument());
+ assertEquals(false, ee.isOriginRemote());
+ }
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+ vm2.invoke(close);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+
+ /**
+ * Tests that invalidates and destroys are propagated to {@link Pool}s
+ * correctly to DataPolicy.EMPTY + InterestPolicy.CACHE_CONTENT
+ *
+ * @since 5.0
+ */
+ public void test016InvalidateAndDestroyToEmptyCCPropagation() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable createEmpty =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ factory.setDataPolicy(DataPolicy.EMPTY);
+ factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+ SerializableRunnable createNormal =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+
+ vm1.invoke(createEmpty);
+ vm1.invoke(new CacheSerializableRunnable("Populate region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "old" + i);
+ }
+ }
+ });
+
+ vm2.invoke(createNormal);
+ vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ ctl.enableEventHistory();
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("Update region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "new" + i, "callbackArg" + i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ List l = ctl.getEventHistory();
+ assertEquals(0, l.size());
+ }
+ });
+
+
+ vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("new" + i, region.getEntry(key).getValue());
+ region.destroy(key, "destroyCB"+i);
+ }
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ List l = ctl.getEventHistory();
+ assertEquals(0, l.size());
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("recreate") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ region.create(key, "create" + i, "createCB"+i);
+ }
+ }
+
<TRUNCATED>
[2/5] incubator-geode git commit: GEODE-1111 Connection Pooling needs
more tests
Posted by kl...@apache.org.
GEODE-1111 Connection Pooling needs more tests
These tests were dependent on an Order class that is only available in
Pivotal's old test framework. I've created a small Order class that replaces
it and allows the tests to run as part of Geode's geode-core distributedTest
task.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4ed2fd37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4ed2fd37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4ed2fd37
Branch: refs/heads/feature/GEODE-1050
Commit: 4ed2fd374cf27ff704b09600eef263b71be9eabc
Parents: ac3d3b4
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Mar 17 15:04:59 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Mar 17 15:10:10 2016 -0700
----------------------------------------------------------------------
.../cache/ConnectionPoolAutoDUnitTest.java | 45 +
.../gemfire/cache/ConnectionPoolDUnitTest.java | 5871 ++++++++++++++++++
2 files changed, 5916 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
new file mode 100755
index 0000000..ad110d7
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
@@ -0,0 +1,45 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache;
+
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+
+import static org.junit.runners.MethodSorters.*;
+import org.junit.FixMethodOrder;
+
+@FixMethodOrder(NAME_ASCENDING)
+public class ConnectionPoolAutoDUnitTest extends ConnectionPoolDUnitTest {
+
+ public ConnectionPoolAutoDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ // TODO Auto-generated method stub
+ ClientServerTestCase.AUTO_LOAD_BALANCE = true;
+ Invoke.invokeInEveryVM(new SerializableRunnable("setupAutoMode") {
+ public void run() {
+ ClientServerTestCase.AUTO_LOAD_BALANCE = true;
+ }
+ });
+ }
+
+ @Override
+ protected final void postTearDownConnectionPoolDUnitTest() throws Exception {
+ ClientServerTestCase.AUTO_LOAD_BALANCE = false;
+ Invoke.invokeInEveryVM(new SerializableRunnable("disableAutoMode") {
+ public void run() {
+ ClientServerTestCase.AUTO_LOAD_BALANCE = false;
+ }
+ });
+ }
+
+}
[5/5] incubator-geode git commit: Disable DistTXDebugDUnitTest
because Dist TX is not supposed to be on develop and it fails with no members
to host buckets
Posted by kl...@apache.org.
Disable DistTXDebugDUnitTest because Dist TX is not supposed to be on develop and it fails with no members to host buckets
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/893dc86b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/893dc86b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/893dc86b
Branch: refs/heads/feature/GEODE-1050
Commit: 893dc86b95aa38cae55ad421c98b61dbbc7e461f
Parents: ad390c9
Author: Kirk Lund <kl...@apache.org>
Authored: Thu Mar 17 16:11:05 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Mar 17 16:11:05 2016 -0700
----------------------------------------------------------------------
.../disttx/DistTXDebugDUnitDisabledTest.java | 1016 +++++++++++++++++
.../gemfire/disttx/DistTXDebugDUnitTest.java | 1017 ------------------
.../disttx/DistTXDistributedTestSuite.java | 42 -
.../disttx/DistTXPersistentDebugDUnitTest.java | 2 +-
4 files changed, 1017 insertions(+), 1060 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/893dc86b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitDisabledTest.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitDisabledTest.java
new file mode 100644
index 0000000..dbd5d3c
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitDisabledTest.java
@@ -0,0 +1,1016 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.disttx;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.PartitionResolver;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.execute.CustomerIDPartitionResolver;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * TODO: reenable this test and fix it when work on Dist TX resumes -- it fails with no members to host buckets
+ */
+public class DistTXDebugDUnitDisabledTest extends CacheTestCase {
+ VM accessor = null;
+ VM dataStore1 = null;
+ VM dataStore2 = null;
+ VM dataStore3 = null;
+
+ public DistTXDebugDUnitDisabledTest(String name) {
+ super(name);
+ }
+
+ @Override
+ public final void postSetUp() throws Exception {
+ Host host = Host.getHost(0);
+ dataStore1 = host.getVM(0);
+ dataStore2 = host.getVM(1);
+ dataStore3 = host.getVM(2);
+ accessor = host.getVM(3);
+ postSetUpDistTXDebugDUnitTest();
+ }
+
+ protected void postSetUpDistTXDebugDUnitTest() throws Exception {
+ }
+
+ @Override
+ public final void postTearDownCacheTestCase() throws Exception {
+ Invoke.invokeInEveryVM(new SerializableRunnable() {
+ public void run() {
+ InternalResourceManager.setResourceObserver(null);
+ }
+ });
+ InternalResourceManager.setResourceObserver(null);
+ }
+
+ public static void createCacheInVm() {
+ new DistTXDebugDUnitDisabledTest("temp").getCache();
+ }
+
+ protected void createCacheInAllVms() {
+ dataStore1.invoke(() -> DistTXDebugDUnitDisabledTest.createCacheInVm());
+ dataStore2.invoke(() -> DistTXDebugDUnitDisabledTest.createCacheInVm());
+ dataStore3.invoke(() -> DistTXDebugDUnitDisabledTest.createCacheInVm());
+ accessor.invoke(() -> DistTXDebugDUnitDisabledTest.createCacheInVm());
+ }
+
+ public static void createPR(String partitionedRegionName, Integer redundancy,
+ Integer localMaxMemory, Integer totalNumBuckets, Object colocatedWith,
+ Boolean isPartitionResolver) {
+ createPR(partitionedRegionName, redundancy, localMaxMemory,
+ totalNumBuckets, colocatedWith, isPartitionResolver,
+ Boolean.TRUE/*Concurrency checks; By default is false*/);
+ }
+
+ public static void createPR(String partitionedRegionName, Integer redundancy,
+ Integer localMaxMemory, Integer totalNumBuckets, Object colocatedWith,
+ Boolean isPartitionResolver, Boolean concurrencyChecks) {
+ PartitionAttributesFactory paf = new PartitionAttributesFactory();
+
+ paf.setRedundantCopies(redundancy.intValue());
+ if (localMaxMemory != null) {
+ paf.setLocalMaxMemory(localMaxMemory.intValue());
+ }
+ if (totalNumBuckets != null) {
+ paf.setTotalNumBuckets(totalNumBuckets.intValue());
+ }
+ if (colocatedWith != null) {
+ paf.setColocatedWith((String) colocatedWith);
+ }
+ if (isPartitionResolver.booleanValue()) {
+ paf.setPartitionResolver(new CustomerIDPartitionResolver(
+ "CustomerIDPartitionResolver"));
+ }
+ PartitionAttributes prAttr = paf.create();
+ AttributesFactory attr = new AttributesFactory();
+ attr.setPartitionAttributes(prAttr);
+ attr.setConcurrencyChecksEnabled(concurrencyChecks);
+ // assertNotNull(basicGetCache());
+ // Region pr = basicGetCache().createRegion(partitionedRegionName,
+ // attr.create());
+ assertNotNull(basicGetCache());
+ Region pr = basicGetCache().createRegion(partitionedRegionName, attr.create());
+ assertNotNull(pr);
+ LogWriterUtils.getLogWriter().info(
+ "Partitioned Region " + partitionedRegionName
+ + " created Successfully :" + pr.toString());
+ }
+
+ protected void createPartitionedRegion(Object[] attributes) {
+ dataStore1.invoke(DistTXDebugDUnitDisabledTest.class, "createPR", attributes);
+ dataStore2.invoke(DistTXDebugDUnitDisabledTest.class, "createPR", attributes);
+ dataStore3.invoke(DistTXDebugDUnitDisabledTest.class, "createPR", attributes);
+ // make Local max memory = o for accessor
+ attributes[2] = new Integer(0);
+ accessor.invoke(DistTXDebugDUnitDisabledTest.class, "createPR", attributes);
+ }
+
+ public static void destroyPR(String partitionedRegionName) {
+ // assertNotNull(basicGetCache());
+ // Region pr = basicGetCache().getRegion(partitionedRegionName);
+
+ assertNotNull(basicGetCache());
+ Region pr = basicGetCache().getRegion(partitionedRegionName);
+ assertNotNull(pr);
+ LogWriterUtils.getLogWriter().info(
+ "Destroying Partitioned Region " + partitionedRegionName);
+ pr.destroyRegion();
+ }
+
+ public static void createRR(String replicatedRegionName, boolean empty) {
+ AttributesFactory af = new AttributesFactory();
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ if (empty) {
+ af.setDataPolicy(DataPolicy.EMPTY);
+ } else {
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ }
+ // Region rr = basicGetCache().createRegion(replicatedRegionName,
+ // af.create());
+ Region rr = basicGetCache().createRegion(replicatedRegionName, af.create());
+ assertNotNull(rr);
+ LogWriterUtils.getLogWriter().info(
+ "Replicated Region " + replicatedRegionName + " created Successfully :"
+ + rr.toString());
+ }
+
+ protected void createReplicatedRegion(Object[] attributes) {
+ dataStore1.invoke(DistTXDebugDUnitDisabledTest.class, "createRR", attributes);
+ dataStore2.invoke(DistTXDebugDUnitDisabledTest.class, "createRR", attributes);
+ dataStore3.invoke(DistTXDebugDUnitDisabledTest.class, "createRR", attributes);
+ // DataPolicy.EMPTY for accessor
+ attributes[1] = Boolean.TRUE;
+ accessor.invoke(DistTXDebugDUnitDisabledTest.class, "createRR", attributes);
+ }
+
+ public void testTXPR() throws Exception {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ // PartitionedRegion pr1 = (PartitionedRegion)
+ // basicGetCache().getRegion(
+ // "pregion1");
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ // put some data (non tx ops)
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.put");
+ pr1.put(dummy, "1_entry__" + i);
+ }
+
+ // put in tx and commit
+ // CacheTransactionManager ctx = basicGetCache()
+ // .getCacheTransactionManager();
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.put in tx 1");
+ pr1.put(dummy, "2_entry__" + i);
+ }
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get");
+ assertEquals("2_entry__" + i, pr1.get(dummy));
+ }
+
+ // put data in tx and rollback
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.put in tx 2");
+ pr1.put(dummy, "3_entry__" + i);
+ }
+ ctx.rollback();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get");
+ assertEquals("2_entry__" + i, pr1.get(dummy));
+ }
+
+ // destroy data in tx and commit
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.destroy in tx 3");
+ pr1.destroy(dummy);
+ }
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get");
+ assertEquals(null, pr1.get(dummy));
+ }
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache()
+ .getRegion("pregion1");
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.getLocalSize " + pr1.getLocalSize());
+ assertEquals(0, pr1.getLocalSize());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+
+ return null;
+ }
+ };
+
+ accessor.invoke(TxOps);
+
+ accessor.invoke(() -> DistTXDebugDUnitDisabledTest.destroyPR( "pregion1" ));
+ }
+
+ public void testTXDestroy_invalidate() throws Exception {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+ createReplicatedRegion(rrAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ Region rr1 = basicGetCache().getRegion("rregion1");
+
+ // put some data (non tx ops)
+ for (int i = 1; i <= 6; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling non-tx put");
+ pr1.put(dummy, "1_entry__" + i);
+ rr1.put(dummy, "1_entry__" + i);
+ }
+
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ // destroy data in tx and commit
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(
+ " calling pr1.destroy in tx key=" + dummy);
+ pr1.destroy(dummy);
+ LogWriterUtils.getLogWriter().info(" calling rr1.destroy in tx key=" + i);
+ rr1.destroy(dummy);
+ }
+ for (int i = 4; i <= 6; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(
+ " calling pr1.invalidate in tx key=" + dummy);
+ pr1.invalidate(dummy);
+ LogWriterUtils.getLogWriter().info(" calling rr1.invalidate in tx key=" + i);
+ rr1.invalidate(dummy);
+ }
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 6; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr1.get");
+ assertEquals(null, pr1.get(dummy));
+ LogWriterUtils.getLogWriter().info(" calling rr1.get");
+ assertEquals(null, rr1.get(i));
+ }
+ return null;
+ }
+ };
+
+ accessor.invoke(TxOps);
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ LogWriterUtils.getLogWriter().info(
+ " calling pr1.getLocalSize " + pr1.getLocalSize());
+ assertEquals(2, pr1.getLocalSize());
+ LogWriterUtils.getLogWriter().info(" calling rr1.size " + rr1.size());
+ assertEquals(3, rr1.size());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+
+ accessor.invoke(() -> DistTXDebugDUnitDisabledTest.destroyPR( "pregion1" ));
+ }
+
+ public void testTXPR_RR() throws Exception {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+ createReplicatedRegion(rrAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ // PartitionedRegion pr1 = (PartitionedRegion)
+ // basicGetCache().getRegion(
+ // "pregion1");
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ // Region rr1 = basicGetCache().getRegion("rregion1");
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ // put some data (non tx ops)
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.put non-tx PR1_entry__" + i);
+ pr1.put(dummy, "PR1_entry__" + i);
+ LogWriterUtils.getLogWriter().info(" calling rr.put non-tx RR1_entry__" + i);
+ rr1.put(new Integer(i), "RR1_entry__" + i);
+ }
+
+ // put in tx and commit
+ // CacheTransactionManager ctx = basicGetCache()
+ // .getCacheTransactionManager();
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.put in tx PR2_entry__" + i);
+ pr1.put(dummy, "PR2_entry__" + i);
+ LogWriterUtils.getLogWriter().info(" calling rr.put in tx RR2_entry__" + i);
+ rr1.put(new Integer(i), "RR2_entry__" + i);
+ }
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get PR2_entry__" + i);
+ assertEquals("PR2_entry__" + i, pr1.get(dummy));
+ LogWriterUtils.getLogWriter().info(" calling rr.get RR2_entry__" + i);
+ assertEquals("RR2_entry__" + i, rr1.get(new Integer(i)));
+ }
+ return null;
+ }
+ };
+
+ accessor.invoke(TxOps);
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.getLocalSize " + pr1.getLocalSize());
+ assertEquals(2, pr1.getLocalSize());
+
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ LogWriterUtils.getLogWriter()
+ .info(" calling rr.getLocalSize " + rr1.size());
+ assertEquals(3, rr1.size());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+
+ accessor.invoke(() -> DistTXDebugDUnitDisabledTest.destroyPR( "pregion1" ));
+ }
+
+ public void testTXPR2() throws Exception {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ // PartitionedRegion pr1 = (PartitionedRegion)
+ // basicGetCache().getRegion(
+ // "pregion1");
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+
+ // put in tx and commit
+ // CacheTransactionManager ctx = basicGetCache()
+ // .getCacheTransactionManager();
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.put in tx 1");
+ pr1.put(dummy, "2_entry__" + i);
+ }
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+ assertEquals("2_entry__" + i, pr1.get(dummy));
+ }
+ return null;
+ }
+ };
+
+ accessor.invoke(TxOps);
+
+ SerializableCallable TxGetOps = new SerializableCallable("TxGetOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.getLocalSize " + pr1.getLocalSize());
+ assertEquals(2, pr1.getLocalSize());
+ return null;
+ }
+ };
+
+ dataStore1.invoke(TxGetOps);
+ dataStore2.invoke(TxGetOps);
+ dataStore3.invoke(TxGetOps);
+
+ SerializableCallable TxRollbackOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ // PartitionedRegion pr1 = (PartitionedRegion)
+ // basicGetCache().getRegion(
+ // "pregion1");
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+
+ // put in tx and commit
+ // CacheTransactionManager ctx = basicGetCache()
+ // .getCacheTransactionManager();
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.put in tx for rollback no_entry__" + i);
+ pr1.put(dummy, "no_entry__" + i);
+ }
+ ctx.rollback();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.get after rollback " + pr1.get(dummy));
+ assertEquals("2_entry__" + i, pr1.get(dummy));
+ }
+ return null;
+ }
+ };
+
+ accessor.invoke(TxRollbackOps);
+
+ accessor.invoke(() -> DistTXDebugDUnitDisabledTest.destroyPR( "pregion1" ));
+ }
+
+ public void testTXPRRR2_create() throws Exception {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+ createReplicatedRegion(rrAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.create in tx 1");
+ pr1.create(dummy, "2_entry__" + i);
+
+ LogWriterUtils.getLogWriter().info(" calling rr.create " + "2_entry__" + i);
+ rr1.create(new Integer(i), "2_entry__" + i);
+ }
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+ assertEquals("2_entry__" + i, pr1.get(dummy));
+
+ LogWriterUtils.getLogWriter().info(
+ " calling rr.get " + rr1.get(new Integer(i)));
+ assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
+ }
+ return null;
+ }
+ };
+
+ accessor.invoke(TxOps);
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ LogWriterUtils.getLogWriter()
+ .info(" calling rr.getLocalSize " + rr1.size());
+ assertEquals(3, rr1.size());
+
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.getLocalSize " + pr1.getLocalSize());
+ assertEquals(2, pr1.getLocalSize());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+ }
+
+ public void testTXPRRR2_putall() throws Exception {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+ createReplicatedRegion(rrAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ Region rr1 = basicGetCache().getRegion("rregion1");
+
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
+ HashMap<Integer, String> rhm = new HashMap<Integer, String>();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ phm.put(dummy, "2_entry__" + i);
+ rhm.put(i, "2_entry__" + i);
+ }
+ pr1.putAll(phm);
+ rr1.putAll(rhm);
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+ assertEquals("2_entry__" + i, pr1.get(dummy));
+
+ LogWriterUtils.getLogWriter().info(
+ " calling rr.get " + rr1.get(new Integer(i)));
+ assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
+ }
+ return null;
+ }
+ };
+
+ accessor.invoke(TxOps);
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ LogWriterUtils.getLogWriter()
+ .info(" calling rr.getLocalSize " + rr1.size());
+ assertEquals(3, rr1.size());
+
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.getLocalSize " + pr1.getLocalSize());
+ assertEquals(2, pr1.getLocalSize());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+
+// accessor.invoke(TxOps);
+ }
+
+ public void testTXPR_putall() throws Exception {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
+ HashMap<Integer, String> rhm = new HashMap<Integer, String>();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ phm.put(dummy, "2_entry__" + i);
+ }
+ pr1.putAll(phm);
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+ assertEquals("2_entry__" + i, pr1.get(dummy));
+
+ }
+ return null;
+ }
+ };
+
+// dataStore1.invoke(TxOps);
+ accessor.invoke(TxOps);
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.getLocalSize " + pr1.getLocalSize());
+ assertEquals(2, pr1.getLocalSize());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+
+// accessor.invoke(TxOps);
+ }
+
+
+ public void testTXRR_removeAll() throws Exception {
+ performRR_removeAllTest(false);
+ }
+
+ public void testTXRR_removeAll_dataNodeAsCoordinator() throws Exception {
+ performRR_removeAllTest(true);
+ }
+
+ /**
+ * @param dataNodeAsCoordinator TODO
+ *
+ */
+ private void performRR_removeAllTest(boolean dataNodeAsCoordinator) {
+ createCacheInAllVms();
+ Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+ createReplicatedRegion(rrAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ //put some data
+ HashMap<Integer, String> rhm = new HashMap<Integer, String>();
+ for (int i = 1; i <= 3; i++) {
+ rhm.put(i, "2_entry__" + i);
+ }
+ rr1.putAll(rhm);
+
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ rr1.removeAll(rhm.keySet());
+
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ LogWriterUtils.getLogWriter().info(
+ " calling rr.get " + rr1.get(new Integer(i)));
+ assertEquals(null, rr1.get(new Integer(i)));
+ }
+ return null;
+ }
+ };
+
+ if (dataNodeAsCoordinator) {
+ dataStore1.invoke(TxOps);
+ } else {
+ accessor.invoke(TxOps);
+ }
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ LogWriterUtils.getLogWriter()
+ .info(" calling rr.getLocalSize " + rr1.size());
+ assertEquals(0, rr1.size());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+
+// accessor.invoke(TxOps);
+ }
+
+ public void testTXPR_removeAll() throws Exception {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ phm.put(dummy, "2_entry__" + i);
+ }
+ pr1.putAll(phm);
+
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ pr1.removeAll(phm.keySet());
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+ i);
+ LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+ assertEquals(null, pr1.get(dummy));
+ }
+ return null;
+ }
+ };
+
+ accessor.invoke(TxOps);
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+ LogWriterUtils.getLogWriter().info(
+ " calling pr.getLocalSize " + pr1.getLocalSize());
+ assertEquals(0, pr1.getLocalSize());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+
+// accessor.invoke(TxOps);
+ }
+
+
+ public void performTXRRtestOps(boolean makeDatNodeAsCoordinator) {
+ createCacheInAllVms();
+ Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+ Boolean.FALSE, Boolean.FALSE };
+ createPartitionedRegion(prAttrs);
+
+ Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+ createReplicatedRegion(rrAttrs);
+
+ SerializableCallable TxOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ LogWriterUtils.getLogWriter().info(" calling rr.put " + "2_entry__" + i);
+ rr1.put(new Integer(i), "2_entry__" + i);
+ }
+ ctx.commit();
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ LogWriterUtils.getLogWriter().info(
+ " calling rr.get " + rr1.get(new Integer(i)));
+ assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
+ }
+ return null;
+ }
+ };
+
+ if (makeDatNodeAsCoordinator) {
+ dataStore1.invoke(TxOps);
+ } else {
+ accessor.invoke(TxOps);
+ }
+
+ // verify data size on all replicas
+ SerializableCallable verifySize = new SerializableCallable("getOps") {
+ @Override
+ public Object call() throws CacheException {
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ LogWriterUtils.getLogWriter()
+ .info(" calling rr.getLocalSize " + rr1.size());
+ assertEquals(3, rr1.size());
+ return null;
+ }
+ };
+ dataStore1.invoke(verifySize);
+ dataStore2.invoke(verifySize);
+ dataStore3.invoke(verifySize);
+
+ SerializableCallable TxRollbackOps = new SerializableCallable("TxOps") {
+ @Override
+ public Object call() throws CacheException {
+ Region rr1 = basicGetCache().getRegion("rregion1");
+ CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+ ctx.setDistributed(true);
+ ctx.begin();
+ for (int i = 1; i <= 3; i++) {
+ LogWriterUtils.getLogWriter().info(
+ " calling rr.put for rollback no_entry__" + i);
+ rr1.put(new Integer(i), "no_entry__" + i);
+ }
+ ctx.rollback();
+ ;
+
+ // verify the data
+ for (int i = 1; i <= 3; i++) {
+ LogWriterUtils.getLogWriter().info(
+ " calling rr.get after rollback "
+ + rr1.get(new Integer(i)));
+ assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
+ }
+ return null;
+ }
+ };
+
+ if (makeDatNodeAsCoordinator) {
+ dataStore1.invoke(TxRollbackOps);
+ } else {
+ accessor.invoke(TxRollbackOps);
+ }
+ }
+
+
+ public void testTXRR2() throws Exception {
+ performTXRRtestOps(false); // actual test
+ }
+
+ public void testTXRR2_dataNodeAsCoordinator() throws Exception {
+ performTXRRtestOps(true);
+ }
+}
+
+class DummyKeyBasedRoutingResolver implements PartitionResolver,
+ DataSerializable {
+ Integer dummyID;
+
+ public DummyKeyBasedRoutingResolver() {
+ }
+
+ public DummyKeyBasedRoutingResolver(int id) {
+ this.dummyID = new Integer(id);
+ }
+
+ public String getName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public Serializable getRoutingObject(EntryOperation opDetails) {
+ return (Serializable) opDetails.getKey();
+ }
+
+ public void close() {
+ // TODO Auto-generated method stub
+ }
+
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ this.dummyID = DataSerializer.readInteger(in);
+ }
+
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeInteger(this.dummyID, out);
+ }
+
+ @Override
+ public int hashCode() {
+ int i = this.dummyID.intValue();
+ return i;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof DummyKeyBasedRoutingResolver))
+ return false;
+
+ DummyKeyBasedRoutingResolver otherDummyID = (DummyKeyBasedRoutingResolver) o;
+ return (otherDummyID.dummyID.equals(dummyID));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/893dc86b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitTest.java
deleted file mode 100644
index 71374d0..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitTest.java
+++ /dev/null
@@ -1,1017 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.disttx;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Properties;
-
-import com.gemstone.gemfire.DataSerializable;
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EntryOperation;
-import com.gemstone.gemfire.cache.PartitionAttributes;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.PartitionResolver;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache30.CacheTestCase;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
-import com.gemstone.gemfire.internal.cache.execute.CustomerIDPartitionResolver;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.Invoke;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableCallable;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
-
-public class DistTXDebugDUnitTest extends CacheTestCase {
- VM accessor = null;
- VM dataStore1 = null;
- VM dataStore2 = null;
- VM dataStore3 = null;
-
- public DistTXDebugDUnitTest(String name) {
- super(name);
- }
-
- @Override
- public final void postSetUp() throws Exception {
- Host host = Host.getHost(0);
- dataStore1 = host.getVM(0);
- dataStore2 = host.getVM(1);
- dataStore3 = host.getVM(2);
- accessor = host.getVM(3);
- postSetUpDistTXDebugDUnitTest();
- }
-
- protected void postSetUpDistTXDebugDUnitTest() throws Exception {
- }
-
- @Override
- public final void postTearDownCacheTestCase() throws Exception {
- Invoke.invokeInEveryVM(new SerializableRunnable() {
- public void run() {
- InternalResourceManager.setResourceObserver(null);
- }
- });
- InternalResourceManager.setResourceObserver(null);
- }
-
- public static void createCacheInVm() {
- new DistTXDebugDUnitTest("temp").getCache();
- }
-
- protected void createCacheInAllVms() {
- dataStore1.invoke(() -> DistTXDebugDUnitTest.createCacheInVm());
- dataStore2.invoke(() -> DistTXDebugDUnitTest.createCacheInVm());
- dataStore3.invoke(() -> DistTXDebugDUnitTest.createCacheInVm());
- accessor.invoke(() -> DistTXDebugDUnitTest.createCacheInVm());
- }
-
- public static void createPR(String partitionedRegionName, Integer redundancy,
- Integer localMaxMemory, Integer totalNumBuckets, Object colocatedWith,
- Boolean isPartitionResolver) {
- createPR(partitionedRegionName, redundancy, localMaxMemory,
- totalNumBuckets, colocatedWith, isPartitionResolver,
- Boolean.TRUE/*Concurrency checks; By default is false*/);
- }
-
- public static void createPR(String partitionedRegionName, Integer redundancy,
- Integer localMaxMemory, Integer totalNumBuckets, Object colocatedWith,
- Boolean isPartitionResolver, Boolean concurrencyChecks) {
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
-
- paf.setRedundantCopies(redundancy.intValue());
- if (localMaxMemory != null) {
- paf.setLocalMaxMemory(localMaxMemory.intValue());
- }
- if (totalNumBuckets != null) {
- paf.setTotalNumBuckets(totalNumBuckets.intValue());
- }
- if (colocatedWith != null) {
- paf.setColocatedWith((String) colocatedWith);
- }
- if (isPartitionResolver.booleanValue()) {
- paf.setPartitionResolver(new CustomerIDPartitionResolver(
- "CustomerIDPartitionResolver"));
- }
- PartitionAttributes prAttr = paf.create();
- AttributesFactory attr = new AttributesFactory();
- attr.setPartitionAttributes(prAttr);
- attr.setConcurrencyChecksEnabled(concurrencyChecks);
- // assertNotNull(basicGetCache());
- // Region pr = basicGetCache().createRegion(partitionedRegionName,
- // attr.create());
- assertNotNull(basicGetCache());
- Region pr = basicGetCache().createRegion(partitionedRegionName, attr.create());
- assertNotNull(pr);
- LogWriterUtils.getLogWriter().info(
- "Partitioned Region " + partitionedRegionName
- + " created Successfully :" + pr.toString());
- }
-
- protected void createPartitionedRegion(Object[] attributes) {
- dataStore1.invoke(DistTXDebugDUnitTest.class, "createPR", attributes);
- dataStore2.invoke(DistTXDebugDUnitTest.class, "createPR", attributes);
- dataStore3.invoke(DistTXDebugDUnitTest.class, "createPR", attributes);
- // make Local max memory = o for accessor
- attributes[2] = new Integer(0);
- accessor.invoke(DistTXDebugDUnitTest.class, "createPR", attributes);
- }
-
- public static void destroyPR(String partitionedRegionName) {
- // assertNotNull(basicGetCache());
- // Region pr = basicGetCache().getRegion(partitionedRegionName);
-
- assertNotNull(basicGetCache());
- Region pr = basicGetCache().getRegion(partitionedRegionName);
- assertNotNull(pr);
- LogWriterUtils.getLogWriter().info(
- "Destroying Partitioned Region " + partitionedRegionName);
- pr.destroyRegion();
- }
-
- public static void createRR(String replicatedRegionName, boolean empty) {
- AttributesFactory af = new AttributesFactory();
- af.setScope(Scope.DISTRIBUTED_ACK);
- if (empty) {
- af.setDataPolicy(DataPolicy.EMPTY);
- } else {
- af.setDataPolicy(DataPolicy.REPLICATE);
- }
- // Region rr = basicGetCache().createRegion(replicatedRegionName,
- // af.create());
- Region rr = basicGetCache().createRegion(replicatedRegionName, af.create());
- assertNotNull(rr);
- LogWriterUtils.getLogWriter().info(
- "Replicated Region " + replicatedRegionName + " created Successfully :"
- + rr.toString());
- }
-
- protected void createReplicatedRegion(Object[] attributes) {
- dataStore1.invoke(DistTXDebugDUnitTest.class, "createRR", attributes);
- dataStore2.invoke(DistTXDebugDUnitTest.class, "createRR", attributes);
- dataStore3.invoke(DistTXDebugDUnitTest.class, "createRR", attributes);
- // DataPolicy.EMPTY for accessor
- attributes[1] = Boolean.TRUE;
- accessor.invoke(DistTXDebugDUnitTest.class, "createRR", attributes);
- }
-
- public void testTXPR() throws Exception {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- // PartitionedRegion pr1 = (PartitionedRegion)
- // basicGetCache().getRegion(
- // "pregion1");
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- // put some data (non tx ops)
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.put");
- pr1.put(dummy, "1_entry__" + i);
- }
-
- // put in tx and commit
- // CacheTransactionManager ctx = basicGetCache()
- // .getCacheTransactionManager();
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.put in tx 1");
- pr1.put(dummy, "2_entry__" + i);
- }
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get");
- assertEquals("2_entry__" + i, pr1.get(dummy));
- }
-
- // put data in tx and rollback
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.put in tx 2");
- pr1.put(dummy, "3_entry__" + i);
- }
- ctx.rollback();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get");
- assertEquals("2_entry__" + i, pr1.get(dummy));
- }
-
- // destroy data in tx and commit
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.destroy in tx 3");
- pr1.destroy(dummy);
- }
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get");
- assertEquals(null, pr1.get(dummy));
- }
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache()
- .getRegion("pregion1");
- LogWriterUtils.getLogWriter().info(
- " calling pr.getLocalSize " + pr1.getLocalSize());
- assertEquals(0, pr1.getLocalSize());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
-
- return null;
- }
- };
-
- accessor.invoke(TxOps);
-
- accessor.invoke(() -> DistTXDebugDUnitTest.destroyPR( "pregion1" ));
- }
-
- public void testTXDestroy_invalidate() throws Exception {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
- createReplicatedRegion(rrAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- Region rr1 = basicGetCache().getRegion("rregion1");
-
- // put some data (non tx ops)
- for (int i = 1; i <= 6; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling non-tx put");
- pr1.put(dummy, "1_entry__" + i);
- rr1.put(dummy, "1_entry__" + i);
- }
-
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- // destroy data in tx and commit
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(
- " calling pr1.destroy in tx key=" + dummy);
- pr1.destroy(dummy);
- LogWriterUtils.getLogWriter().info(" calling rr1.destroy in tx key=" + i);
- rr1.destroy(dummy);
- }
- for (int i = 4; i <= 6; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(
- " calling pr1.invalidate in tx key=" + dummy);
- pr1.invalidate(dummy);
- LogWriterUtils.getLogWriter().info(" calling rr1.invalidate in tx key=" + i);
- rr1.invalidate(dummy);
- }
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 6; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr1.get");
- assertEquals(null, pr1.get(dummy));
- LogWriterUtils.getLogWriter().info(" calling rr1.get");
- assertEquals(null, rr1.get(i));
- }
- return null;
- }
- };
-
- accessor.invoke(TxOps);
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- Region rr1 = basicGetCache().getRegion("rregion1");
- LogWriterUtils.getLogWriter().info(
- " calling pr1.getLocalSize " + pr1.getLocalSize());
- assertEquals(2, pr1.getLocalSize());
- LogWriterUtils.getLogWriter().info(" calling rr1.size " + rr1.size());
- assertEquals(3, rr1.size());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
-
- accessor.invoke(() -> DistTXDebugDUnitTest.destroyPR( "pregion1" ));
- }
-
- public void testTXPR_RR() throws Exception {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
- createReplicatedRegion(rrAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- // PartitionedRegion pr1 = (PartitionedRegion)
- // basicGetCache().getRegion(
- // "pregion1");
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- // Region rr1 = basicGetCache().getRegion("rregion1");
- Region rr1 = basicGetCache().getRegion("rregion1");
- // put some data (non tx ops)
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.put non-tx PR1_entry__" + i);
- pr1.put(dummy, "PR1_entry__" + i);
- LogWriterUtils.getLogWriter().info(" calling rr.put non-tx RR1_entry__" + i);
- rr1.put(new Integer(i), "RR1_entry__" + i);
- }
-
- // put in tx and commit
- // CacheTransactionManager ctx = basicGetCache()
- // .getCacheTransactionManager();
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.put in tx PR2_entry__" + i);
- pr1.put(dummy, "PR2_entry__" + i);
- LogWriterUtils.getLogWriter().info(" calling rr.put in tx RR2_entry__" + i);
- rr1.put(new Integer(i), "RR2_entry__" + i);
- }
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get PR2_entry__" + i);
- assertEquals("PR2_entry__" + i, pr1.get(dummy));
- LogWriterUtils.getLogWriter().info(" calling rr.get RR2_entry__" + i);
- assertEquals("RR2_entry__" + i, rr1.get(new Integer(i)));
- }
- return null;
- }
- };
-
- accessor.invoke(TxOps);
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- LogWriterUtils.getLogWriter().info(
- " calling pr.getLocalSize " + pr1.getLocalSize());
- assertEquals(2, pr1.getLocalSize());
-
- Region rr1 = basicGetCache().getRegion("rregion1");
- LogWriterUtils.getLogWriter()
- .info(" calling rr.getLocalSize " + rr1.size());
- assertEquals(3, rr1.size());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
-
- accessor.invoke(() -> DistTXDebugDUnitTest.destroyPR( "pregion1" ));
- }
-
- public void testTXPR2() throws Exception {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- // PartitionedRegion pr1 = (PartitionedRegion)
- // basicGetCache().getRegion(
- // "pregion1");
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-
- // put in tx and commit
- // CacheTransactionManager ctx = basicGetCache()
- // .getCacheTransactionManager();
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.put in tx 1");
- pr1.put(dummy, "2_entry__" + i);
- }
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
- assertEquals("2_entry__" + i, pr1.get(dummy));
- }
- return null;
- }
- };
-
- accessor.invoke(TxOps);
-
- SerializableCallable TxGetOps = new SerializableCallable("TxGetOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- LogWriterUtils.getLogWriter().info(
- " calling pr.getLocalSize " + pr1.getLocalSize());
- assertEquals(2, pr1.getLocalSize());
- return null;
- }
- };
-
- dataStore1.invoke(TxGetOps);
- dataStore2.invoke(TxGetOps);
- dataStore3.invoke(TxGetOps);
-
- SerializableCallable TxRollbackOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- // PartitionedRegion pr1 = (PartitionedRegion)
- // basicGetCache().getRegion(
- // "pregion1");
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-
- // put in tx and commit
- // CacheTransactionManager ctx = basicGetCache()
- // .getCacheTransactionManager();
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(
- " calling pr.put in tx for rollback no_entry__" + i);
- pr1.put(dummy, "no_entry__" + i);
- }
- ctx.rollback();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(
- " calling pr.get after rollback " + pr1.get(dummy));
- assertEquals("2_entry__" + i, pr1.get(dummy));
- }
- return null;
- }
- };
-
- accessor.invoke(TxRollbackOps);
-
- accessor.invoke(() -> DistTXDebugDUnitTest.destroyPR( "pregion1" ));
- }
-
- public void testTXPRRR2_create() throws Exception {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
- createReplicatedRegion(rrAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- Region rr1 = basicGetCache().getRegion("rregion1");
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.create in tx 1");
- pr1.create(dummy, "2_entry__" + i);
-
- LogWriterUtils.getLogWriter().info(" calling rr.create " + "2_entry__" + i);
- rr1.create(new Integer(i), "2_entry__" + i);
- }
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
- assertEquals("2_entry__" + i, pr1.get(dummy));
-
- LogWriterUtils.getLogWriter().info(
- " calling rr.get " + rr1.get(new Integer(i)));
- assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
- }
- return null;
- }
- };
-
- accessor.invoke(TxOps);
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- Region rr1 = basicGetCache().getRegion("rregion1");
- LogWriterUtils.getLogWriter()
- .info(" calling rr.getLocalSize " + rr1.size());
- assertEquals(3, rr1.size());
-
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- LogWriterUtils.getLogWriter().info(
- " calling pr.getLocalSize " + pr1.getLocalSize());
- assertEquals(2, pr1.getLocalSize());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
- }
-
- public void testTXPRRR2_putall() throws Exception {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
- createReplicatedRegion(rrAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- Region rr1 = basicGetCache().getRegion("rregion1");
-
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
- HashMap<Integer, String> rhm = new HashMap<Integer, String>();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- phm.put(dummy, "2_entry__" + i);
- rhm.put(i, "2_entry__" + i);
- }
- pr1.putAll(phm);
- rr1.putAll(rhm);
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
- assertEquals("2_entry__" + i, pr1.get(dummy));
-
- LogWriterUtils.getLogWriter().info(
- " calling rr.get " + rr1.get(new Integer(i)));
- assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
- }
- return null;
- }
- };
-
- accessor.invoke(TxOps);
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- Region rr1 = basicGetCache().getRegion("rregion1");
- LogWriterUtils.getLogWriter()
- .info(" calling rr.getLocalSize " + rr1.size());
- assertEquals(3, rr1.size());
-
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- LogWriterUtils.getLogWriter().info(
- " calling pr.getLocalSize " + pr1.getLocalSize());
- assertEquals(2, pr1.getLocalSize());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
-
-// accessor.invoke(TxOps);
- }
-
- public void testTXPR_putall() throws Exception {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
- HashMap<Integer, String> rhm = new HashMap<Integer, String>();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- phm.put(dummy, "2_entry__" + i);
- }
- pr1.putAll(phm);
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
- assertEquals("2_entry__" + i, pr1.get(dummy));
-
- }
- return null;
- }
- };
-
-// dataStore1.invoke(TxOps);
- accessor.invoke(TxOps);
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- LogWriterUtils.getLogWriter().info(
- " calling pr.getLocalSize " + pr1.getLocalSize());
- assertEquals(2, pr1.getLocalSize());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
-
-// accessor.invoke(TxOps);
- }
-
-
- public void testTXRR_removeAll() throws Exception {
- performRR_removeAllTest(false);
- }
-
- public void testTXRR_removeAll_dataNodeAsCoordinator() throws Exception {
- performRR_removeAllTest(true);
- }
-
- /**
- * @param dataNodeAsCoordinator TODO
- *
- */
- private void performRR_removeAllTest(boolean dataNodeAsCoordinator) {
- createCacheInAllVms();
- Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
- createReplicatedRegion(rrAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- Region rr1 = basicGetCache().getRegion("rregion1");
- //put some data
- HashMap<Integer, String> rhm = new HashMap<Integer, String>();
- for (int i = 1; i <= 3; i++) {
- rhm.put(i, "2_entry__" + i);
- }
- rr1.putAll(rhm);
-
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- rr1.removeAll(rhm.keySet());
-
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- LogWriterUtils.getLogWriter().info(
- " calling rr.get " + rr1.get(new Integer(i)));
- assertEquals(null, rr1.get(new Integer(i)));
- }
- return null;
- }
- };
-
- if (dataNodeAsCoordinator) {
- dataStore1.invoke(TxOps);
- } else {
- accessor.invoke(TxOps);
- }
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- Region rr1 = basicGetCache().getRegion("rregion1");
- LogWriterUtils.getLogWriter()
- .info(" calling rr.getLocalSize " + rr1.size());
- assertEquals(0, rr1.size());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
-
-// accessor.invoke(TxOps);
- }
-
- public void testTXPR_removeAll() throws Exception {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- phm.put(dummy, "2_entry__" + i);
- }
- pr1.putAll(phm);
-
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- pr1.removeAll(phm.keySet());
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
- i);
- LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
- assertEquals(null, pr1.get(dummy));
- }
- return null;
- }
- };
-
- accessor.invoke(TxOps);
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
- LogWriterUtils.getLogWriter().info(
- " calling pr.getLocalSize " + pr1.getLocalSize());
- assertEquals(0, pr1.getLocalSize());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
-
-// accessor.invoke(TxOps);
- }
-
-
- public void performTXRRtestOps(boolean makeDatNodeAsCoordinator) {
- createCacheInAllVms();
- Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
- Boolean.FALSE, Boolean.FALSE };
- createPartitionedRegion(prAttrs);
-
- Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
- createReplicatedRegion(rrAttrs);
-
- SerializableCallable TxOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- Region rr1 = basicGetCache().getRegion("rregion1");
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- LogWriterUtils.getLogWriter().info(" calling rr.put " + "2_entry__" + i);
- rr1.put(new Integer(i), "2_entry__" + i);
- }
- ctx.commit();
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- LogWriterUtils.getLogWriter().info(
- " calling rr.get " + rr1.get(new Integer(i)));
- assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
- }
- return null;
- }
- };
-
- if (makeDatNodeAsCoordinator) {
- dataStore1.invoke(TxOps);
- } else {
- accessor.invoke(TxOps);
- }
-
- // verify data size on all replicas
- SerializableCallable verifySize = new SerializableCallable("getOps") {
- @Override
- public Object call() throws CacheException {
- Region rr1 = basicGetCache().getRegion("rregion1");
- LogWriterUtils.getLogWriter()
- .info(" calling rr.getLocalSize " + rr1.size());
- assertEquals(3, rr1.size());
- return null;
- }
- };
- dataStore1.invoke(verifySize);
- dataStore2.invoke(verifySize);
- dataStore3.invoke(verifySize);
-
- SerializableCallable TxRollbackOps = new SerializableCallable("TxOps") {
- @Override
- public Object call() throws CacheException {
- Region rr1 = basicGetCache().getRegion("rregion1");
- CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
- ctx.setDistributed(true);
- ctx.begin();
- for (int i = 1; i <= 3; i++) {
- LogWriterUtils.getLogWriter().info(
- " calling rr.put for rollback no_entry__" + i);
- rr1.put(new Integer(i), "no_entry__" + i);
- }
- ctx.rollback();
- ;
-
- // verify the data
- for (int i = 1; i <= 3; i++) {
- LogWriterUtils.getLogWriter().info(
- " calling rr.get after rollback "
- + rr1.get(new Integer(i)));
- assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
- }
- return null;
- }
- };
-
- if (makeDatNodeAsCoordinator) {
- dataStore1.invoke(TxRollbackOps);
- } else {
- accessor.invoke(TxRollbackOps);
- }
- }
-
-
- public void testTXRR2() throws Exception {
- performTXRRtestOps(false); // actual test
- }
-
- public void testTXRR2_dataNodeAsCoordinator() throws Exception {
- performTXRRtestOps(true);
- }
-}
-
-class DummyKeyBasedRoutingResolver implements PartitionResolver,
- DataSerializable {
- Integer dummyID;
-
- public DummyKeyBasedRoutingResolver() {
- }
-
- public DummyKeyBasedRoutingResolver(int id) {
- this.dummyID = new Integer(id);
- }
-
- public String getName() {
- // TODO Auto-generated method stub
- return null;
- }
-
- public Serializable getRoutingObject(EntryOperation opDetails) {
- return (Serializable) opDetails.getKey();
- }
-
- public void close() {
- // TODO Auto-generated method stub
- }
-
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- this.dummyID = DataSerializer.readInteger(in);
- }
-
- public void toData(DataOutput out) throws IOException {
- DataSerializer.writeInteger(this.dummyID, out);
- }
-
- @Override
- public int hashCode() {
- int i = this.dummyID.intValue();
- return i;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (!(o instanceof DummyKeyBasedRoutingResolver))
- return false;
-
- DummyKeyBasedRoutingResolver otherDummyID = (DummyKeyBasedRoutingResolver) o;
- return (otherDummyID.dummyID.equals(dummyID));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/893dc86b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDistributedTestSuite.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDistributedTestSuite.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDistributedTestSuite.java
deleted file mode 100644
index 3b829c1..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDistributedTestSuite.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.disttx;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
- CacheMapDistTXDUnitTest.class,
- DistributedTransactionDUnitTest.class,
- DistTXDebugDUnitTest.class,
- DistTXOrderDUnitTest.class,
- DistTXPersistentDebugDUnitTest.class,
- DistTXRestrictionsDUnitTest.class,
- DistTXWithDeltaDUnitTest.class,
- PersistentPartitionedRegionWithDistTXDUnitTest.class,
- PRDistTXDUnitTest.class,
- PRDistTXWithVersionsDUnitTest.class
-})
-
-/**
- * Suite of tests for distributed transactions dunit tests
- * @author shirishd
- */
-public class DistTXDistributedTestSuite {
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/893dc86b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java
index 69d6149..097c37c 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java
@@ -31,7 +31,7 @@ import com.gemstone.gemfire.test.dunit.Invoke;
import com.gemstone.gemfire.test.dunit.LogWriterUtils;
import com.gemstone.gemfire.test.dunit.SerializableCallable;
-public class DistTXPersistentDebugDUnitTest extends DistTXDebugDUnitTest {
+public class DistTXPersistentDebugDUnitTest extends DistTXDebugDUnitDisabledTest {
public DistTXPersistentDebugDUnitTest(String name) {
super(name);
[3/5] incubator-geode git commit: Merge remote-tracking branch
'origin/develop' into feature/GEODE-1050
Posted by kl...@apache.org.
Merge remote-tracking branch 'origin/develop' into feature/GEODE-1050
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7941b839
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7941b839
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7941b839
Branch: refs/heads/feature/GEODE-1050
Commit: 7941b83939b921b7fc5bb31dfbe0f457b9fba781
Parents: f45f0f5 4ed2fd3
Author: Kirk Lund <kl...@apache.org>
Authored: Thu Mar 17 15:58:33 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Mar 17 15:58:33 2016 -0700
----------------------------------------------------------------------
.../cache/ConnectionPoolAutoDUnitTest.java | 45 +
.../gemfire/cache/ConnectionPoolDUnitTest.java | 5871 ++++++++++++++++++
2 files changed, 5916 insertions(+)
----------------------------------------------------------------------
[4/5] incubator-geode git commit: Fix headers and change setUp to
postSetUp chain
Posted by kl...@apache.org.
Fix headers and change setUp to postSetUp chain
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ad390c99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ad390c99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ad390c99
Branch: refs/heads/feature/GEODE-1050
Commit: ad390c9981141a5f22aa2ba5e3a86f992dfacc48
Parents: 7941b83
Author: Kirk Lund <kl...@apache.org>
Authored: Thu Mar 17 16:01:04 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Mar 17 16:01:04 2016 -0700
----------------------------------------------------------------------
.../cache/ConnectionPoolAutoDUnitTest.java | 28 +++++++++++-------
.../gemfire/cache/ConnectionPoolDUnitTest.java | 31 ++++++++++++++------
2 files changed, 40 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ad390c99/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
index ad110d7..3b43ab8 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
@@ -1,9 +1,18 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package com.gemstone.gemfire.cache;
@@ -20,10 +29,9 @@ public class ConnectionPoolAutoDUnitTest extends ConnectionPoolDUnitTest {
public ConnectionPoolAutoDUnitTest(String name) {
super(name);
}
-
- public void setUp() throws Exception {
- super.setUp();
- // TODO Auto-generated method stub
+
+ @Override
+ protected final void postSetUpConnectionPoolDUnitTest() throws Exception {
ClientServerTestCase.AUTO_LOAD_BALANCE = true;
Invoke.invokeInEveryVM(new SerializableRunnable("setupAutoMode") {
public void run() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ad390c99/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
index 41d48aa..2acab3a 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
@@ -1,9 +1,18 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package com.gemstone.gemfire.cache;
@@ -97,8 +106,8 @@ public class ConnectionPoolDUnitTest extends CacheTestCase {
super(name);
}
- public void setUp() throws Exception {
- super.setUp();
+ @Override
+ public final void postSetUp() throws Exception {
// avoid IllegalStateException from HandShake by connecting all vms to
// system before creating pool
getSystem();
@@ -107,10 +116,14 @@ public class ConnectionPoolDUnitTest extends CacheTestCase {
getSystem();
}
});
+ postSetUpConnectionPoolDUnitTest();
+ }
+
+ protected void postSetUpConnectionPoolDUnitTest() throws Exception {
}
@Override
- protected final void postTearDownCacheTestCase() throws Exception {
+ public final void postTearDownCacheTestCase() throws Exception {
Invoke.invokeInEveryVM(new SerializableRunnable() {
public void run() {
Map pools = PoolManager.getAll();