You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/03/17 23:10:37 UTC
[1/2] incubator-geode git commit: GEODE-1111 Connection Pooling needs
more tests
Repository: incubator-geode
Updated Branches:
refs/heads/develop ac3d3b4c5 -> 4ed2fd374
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
new file mode 100755
index 0000000..41d48aa
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
@@ -0,0 +1,5871 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache;
+
+import static org.junit.runners.MethodSorters.NAME_ASCENDING;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.AssertionFailedError;
+
+import org.junit.FixMethodOrder;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Endpoint;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
+import com.gemstone.gemfire.cache30.TestCacheLoader;
+import com.gemstone.gemfire.cache30.TestCacheWriter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.EntryExpiryTask;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifierStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LocalLogWriter;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.ThreadUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+/**
+ * This class tests the client connection pool in GemFire.
+ * It does so by creating a cache server with a cache and a pre-defined region and
+ * a data loader. The client creates the same region with a pool
+ * (this happens in the controller VM). the client then spins up
+ * 10 different threads and issues gets on keys. The server data loader returns the
+ * data to the client.
+ * Test uses Groboutils TestRunnable objects to achieve multi threading behavior
+ * in the test.
+ *
+ */
+@FixMethodOrder(NAME_ASCENDING)
+public class ConnectionPoolDUnitTest extends CacheTestCase {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The port on which the bridge server was started in this VM */
+ private static int bridgeServerPort;
+
+ protected static int port = 0;
+ protected static int port2 = 0;
+
+ protected static int numberOfAfterInvalidates;
+ protected static int numberOfAfterCreates;
+ protected static int numberOfAfterUpdates;
+
+ protected final static int TYPE_CREATE = 0;
+ protected final static int TYPE_UPDATE = 1;
+ protected final static int TYPE_INVALIDATE = 2;
+ protected final static int TYPE_DESTROY = 3;
+
+ public ConnectionPoolDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ // avoid IllegalStateException from HandShake by connecting all vms to
+ // system before creating pool
+ getSystem();
+ Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
+ public void run() {
+ getSystem();
+ }
+ });
+ }
+
+ @Override
+ protected final void postTearDownCacheTestCase() throws Exception {
+ Invoke.invokeInEveryVM(new SerializableRunnable() {
+ public void run() {
+ Map pools = PoolManager.getAll();
+ if (!pools.isEmpty()) {
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().warning("found pools remaining after teardown: " + pools);
+ assertEquals(0, pools.size());
+ }
+ }
+ });
+ postTearDownConnectionPoolDUnitTest();
+ }
+
+ protected void postTearDownConnectionPoolDUnitTest() throws Exception {
+ }
+
+ protected/*GemStoneAddition*/ static PoolImpl getPool(Region r) {
+ PoolImpl result = null;
+ String poolName = r.getAttributes().getPoolName();
+ if (poolName != null) {
+ result = (PoolImpl)PoolManager.find(poolName);
+ }
+ return result;
+ }
+ protected static TestCacheWriter getTestWriter(Region r) {
+ return (TestCacheWriter)r.getAttributes().getCacheWriter();
+ }
+ /**
+ * Create a bridge server on the given port without starting it.
+ *
+ * @since 5.0.2
+ */
+ protected void createBridgeServer(int port) throws IOException {
+ CacheServer bridge = getCache().addCacheServer();
+ bridge.setPort(port);
+ bridge.setMaxThreads(getMaxThreads());
+ bridgeServerPort = bridge.getPort();
+ }
+
+ /**
+ * Starts a bridge server on the given port, using the given
+ * deserializeValues and notifyBySubscription to serve up the
+ * given region.
+ *
+ * @since 4.0
+ */
+ protected void startBridgeServer(int port)
+ throws IOException {
+ startBridgeServer(port, -1);
+ }
+
+ protected void startBridgeServer(int port, int socketBufferSize) throws IOException {
+ startBridgeServer(port, socketBufferSize, CacheServer.DEFAULT_LOAD_POLL_INTERVAL);
+ }
+
+ protected void startBridgeServer(int port, int socketBufferSize, long loadPollInterval)
+ throws IOException {
+
+ Cache cache = getCache();
+ CacheServer bridge = cache.addCacheServer();
+ bridge.setPort(port);
+ if (socketBufferSize != -1) {
+ bridge.setSocketBufferSize(socketBufferSize);
+ }
+ bridge.setMaxThreads(getMaxThreads());
+ bridge.setLoadPollInterval(loadPollInterval);
+ bridge.start();
+ bridgeServerPort = bridge.getPort();
+ }
+
+ /**
+ * By default return 0 which turns off selector and gives thread per cnx.
+ * Test subclasses can override to run with selector.
+ * @since 5.1
+ */
+ protected int getMaxThreads() {
+ return 0;
+ }
+
+ /**
+ * Stops the bridge server that serves up the given cache.
+ *
+ * @since 4.0
+ */
+ void stopBridgeServer(Cache cache) {
+ CacheServer bridge =
+ cache.getCacheServers().iterator().next();
+ bridge.stop();
+ assertFalse(bridge.isRunning());
+ }
+
+ void stopBridgeServers(Cache cache) {
+ CacheServer bridge = null;
+ for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) {
+ bridge = (CacheServer) bsI.next();
+ bridge.stop();
+ assertFalse(bridge.isRunning());
+ }
+ }
+
+ private void restartBridgeServers(Cache cache) throws IOException
+ {
+ CacheServer bridge = null;
+ for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) {
+ bridge = (CacheServer) bsI.next();
+ bridge.start();
+ assertTrue(bridge.isRunning());
+ }
+ }
+
+ protected InternalDistributedSystem createLonerDS() {
+ disconnectFromDS();
+ InternalDistributedSystem ds = getLonerSystem();
+ assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size());
+ return ds;
+ }
+
+
+
+ /**
+ * Returns region attributes for a <code>LOCAL</code> region
+ */
+ protected RegionAttributes getRegionAttributes() {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+ return factory.create();
+ }
+
+ private static String createBridgeClientConnection(String host, int[] ports) {
+ StringBuffer sb = new StringBuffer();
+ for (int i = 0; i < ports.length; i++) {
+ if (i > 0) sb.append(",");
+ sb.append("name" + i + "=");
+ sb.append(host + ":" + ports[i]);
+ }
+ return sb.toString();
+ }
+
+ private class EventWrapper {
+ public final EntryEvent event;
+ public final Object key;
+ public final Object val;
+ public final Object arg;
+ public final int type;
+ public EventWrapper(EntryEvent ee, int type) {
+ this.event = ee;
+ this.key = ee.getKey();
+ this.val = ee.getNewValue();
+ this.arg = ee.getCallbackArgument();
+ this.type = type;
+ }
+ public String toString() {
+ return "EventWrapper: event=" + event + ", type=" + type;
+ }
+ }
+
+ protected class ControlListener extends CacheListenerAdapter {
+ public final LinkedList events = new LinkedList();
+ public final Object CONTROL_LOCK = new Object();
+
+ public boolean waitWhileNotEnoughEvents(long sleepMs, int eventCount) {
+ long maxMillis = System.currentTimeMillis() + sleepMs;
+ synchronized (this.CONTROL_LOCK) {
+ try {
+ while (this.events.size() < eventCount) {
+ long waitMillis = maxMillis - System.currentTimeMillis();
+ if (waitMillis < 10) {
+ break;
+ }
+ this.CONTROL_LOCK.wait(waitMillis);
+ }
+ } catch (InterruptedException abort) {
+ fail("interrupted");
+ }
+ return !this.events.isEmpty();
+ }
+ }
+
+ public void afterCreate(EntryEvent e) {
+ //System.out.println("afterCreate: " + e);
+ synchronized(this.CONTROL_LOCK) {
+ this.events.add(new EventWrapper(e, TYPE_CREATE));
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+
+ public void afterUpdate(EntryEvent e) {
+ //System.out.println("afterUpdate: " + e);
+ synchronized(this.CONTROL_LOCK) {
+ this.events.add(new EventWrapper(e, TYPE_UPDATE));
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+
+ public void afterInvalidate(EntryEvent e) {
+ //System.out.println("afterInvalidate: " + e);
+ synchronized(this.CONTROL_LOCK) {
+ this.events.add(new EventWrapper(e, TYPE_INVALIDATE));
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+
+ public void afterDestroy(EntryEvent e) {
+ //System.out.println("afterDestroy: " + e);
+ synchronized(this.CONTROL_LOCK) {
+ this.events.add(new EventWrapper(e, TYPE_DESTROY));
+ this.CONTROL_LOCK.notifyAll();
+ }
+ }
+ }
+
+
+
+
+ /**
+ * Create a fake EntryEvent that returns the provided region for {@link CacheEvent#getRegion()}
+ * and returns {@link com.gemstone.gemfire.cache.Operation#LOCAL_LOAD_CREATE} for {@link CacheEvent#getOperation()}
+ * @param r
+ * @return fake entry event
+ */
+ protected static EntryEvent createFakeyEntryEvent(final Region r) {
+ return new EntryEvent() {
+ public Operation getOperation()
+ {
+ return Operation.LOCAL_LOAD_CREATE; // fake out pool to exit early
+ }
+ public Region getRegion()
+ {
+ return r;
+ }
+ public Object getKey() { return null; }
+ public Object getOldValue() { return null;}
+ public boolean isOldValueAvailable() {return true;}
+ public Object getNewValue() { return null;}
+ public boolean isLocalLoad() { return false;}
+ public boolean isNetLoad() {return false;}
+ public boolean isLoad() {return true; }
+ public boolean isNetSearch() {return false;}
+ public TransactionId getTransactionId() {return null;}
+ public Object getCallbackArgument() {return null;}
+ public boolean isCallbackArgumentAvailable() {return true;}
+ public boolean isOriginRemote() {return false;}
+ public DistributedMember getDistributedMember() {return null;}
+ public boolean isExpiration() { return false;}
+ public boolean isDistributed() { return false;}
+ public boolean isBridgeEvent() {
+ return hasClientOrigin();
+ }
+ public boolean hasClientOrigin() {
+ return false;
+ }
+ public ClientProxyMembershipID getContext() {
+ return null;
+ }
+ public SerializedCacheValue getSerializedOldValue() {return null;}
+ public SerializedCacheValue getSerializedNewValue() {return null;}
+ };
+ }
+
+ public void verifyBalanced(final PoolImpl pool, int expectedServer,
+ final int expectedConsPerServer) {
+ verifyServerCount(pool, expectedServer);
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return balanced(pool, expectedConsPerServer);
+ }
+ public String description() {
+ return "expected " + expectedConsPerServer
+ + " but endpoints=" + outOfBalanceReport(pool);
+ }
+ };
+ Wait.waitForCriterion(ev, 2 * 60 * 1000, 200, true);
+ assertEquals("expected " + expectedConsPerServer
+ + " but endpoints=" + outOfBalanceReport(pool),
+ true, balanced(pool, expectedConsPerServer));
+ }
+ protected boolean balanced(PoolImpl pool, int expectedConsPerServer) {
+ Iterator it = pool.getEndpointMap().values().iterator();
+ while (it.hasNext()) {
+ Endpoint ep = (Endpoint)it.next();
+ if (ep.getStats().getConnections() != expectedConsPerServer) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected String outOfBalanceReport(PoolImpl pool) {
+ StringBuffer result = new StringBuffer();
+ Iterator it = pool.getEndpointMap().values().iterator();
+ result.append("<");
+ while (it.hasNext()) {
+ Endpoint ep = (Endpoint)it.next();
+ result.append("ep=" + ep);
+ result.append(" conCount=" + ep.getStats().getConnections());
+ if (it.hasNext()) {
+ result.append(", ");
+ }
+ }
+ result.append(">");
+ return result.toString();
+ }
+
+ public void waitForBlacklistToClear(final PoolImpl pool) {
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return pool.getBlacklistedServers().size() == 0;
+ }
+ public String description() {
+ return null;
+ }
+ };
+ Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+ assertEquals("unexpected blacklistedServers=" + pool.getBlacklistedServers(),
+ 0, pool.getBlacklistedServers().size());
+ }
+
+ public void verifyServerCount(final PoolImpl pool, final int expectedCount) {
+ getCache().getLogger().info("verifyServerCount expects=" + expectedCount);
+ WaitCriterion ev = new WaitCriterion() {
+ String excuse;
+ public boolean done() {
+ int actual = pool.getConnectedServerCount();
+ if (actual == expectedCount) {
+ return true;
+ }
+ excuse = "Found only " + actual + " servers, expected " + expectedCount;
+ return false;
+ }
+ public String description() {
+ return excuse;
+ }
+ };
+ Wait.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
+ }
+
+ /**
+ * Tests that the callback argument is sent to the server
+ */
+ public void test001CallbackArg() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ final Object createCallbackArg = "CREATE CALLBACK ARG";
+ final Object updateCallbackArg = "PUT CALLBACK ARG";
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+
+ CacheWriter cw = new TestCacheWriter() {
+ public final void beforeUpdate2(EntryEvent event)
+ throws CacheWriterException {
+ Object beca = event.getCallbackArgument();
+ assertEquals(updateCallbackArg, beca);
+ }
+
+ public void beforeCreate2(EntryEvent event)
+ throws CacheWriterException {
+ Object beca = event.getCallbackArgument();
+ assertEquals(createCallbackArg, beca);
+ }
+ };
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+
+ ClientServerTestCase.configureConnectionPool(factory,NetworkUtils.getServerHostName(host),port,-1,true,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(create);
+ vm1.invoke(new CacheSerializableRunnable("Add entries") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.create(new Integer(i), "old" + i, createCallbackArg);
+ }
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "new" + i, updateCallbackArg);
+ }
+ }
+ });
+
+ vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ TestCacheWriter writer = getTestWriter(region);
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+
+ }
+
+ /**
+ * Tests that consecutive puts have the callback assigned
+ * appropriately.
+ */
+ public void test002CallbackArg2() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+
+ final Object createCallbackArg = "CREATE CALLBACK ARG";
+// final Object updateCallbackArg = "PUT CALLBACK ARG";
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ CacheWriter cw = new TestCacheWriter() {
+ public void beforeCreate2(EntryEvent event)
+ throws CacheWriterException {
+ Integer key = (Integer) event.getKey();
+ if (key.intValue() % 2 == 0) {
+ Object beca = event.getCallbackArgument();
+ assertEquals(createCallbackArg, beca);
+ } else {
+ Object beca = event.getCallbackArgument();
+ assertNull(beca);
+ }
+ }
+ };
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(create);
+ vm1.invoke(new CacheSerializableRunnable("Add entries") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ if (i % 2 == 0) {
+ region.create(new Integer(i), "old" + i, createCallbackArg);
+
+ } else {
+ region.create(new Integer(i), "old" + i);
+ }
+ }
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+
+ vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ TestCacheWriter writer = getTestWriter(region);
+ assertTrue(writer.wasInvoked());
+ }
+ });
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+
+
+ /**
+ * Tests for bug 36684 by having two bridge servers with cacheloaders that should always return
+ * a value and one client connected to each server reading values. If the bug exists, the
+ * clients will get null sometimes.
+ * @throws InterruptedException
+ */
+ public void test003Bug36684() throws CacheException, InterruptedException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+
+ // Create the cache servers with distributed, mirrored region
+ SerializableRunnable createServer =
+ new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ CacheLoader cl = new CacheLoader() {
+ public Object load(LoaderHelper helper) {
+ return helper.getKey();
+ }
+ public void close() {
+
+ }
+ };
+ AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ };
+ getSystem().getLogWriter().info("before create server");
+ vm0.invoke(createServer);
+ vm1.invoke(createServer);
+
+ // Create cache server clients
+ final int numberOfKeys = 1000;
+ final String host0 = NetworkUtils.getServerHostName(host);
+ final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ SerializableRunnable createClient =
+ new CacheSerializableRunnable("Create Cache Server Client") {
+ public void run2() throws CacheException {
+ // reset all static listener variables in case this is being rerun in a subclass
+ numberOfAfterInvalidates = 0;
+ numberOfAfterCreates = 0;
+ numberOfAfterUpdates = 0;
+ // create the region
+ getLonerSystem();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+ // create bridge writer
+ ClientServerTestCase.configureConnectionPool(factory,host0,vm0Port,vm1Port,true,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+ getSystem().getLogWriter().info("before create client");
+ vm2.invoke(createClient);
+ vm3.invoke(createClient);
+
+ // Initialize each client with entries (so that afterInvalidate is called)
+ SerializableRunnable initializeClient =
+ new CacheSerializableRunnable("Initialize Client") {
+ public void run2() throws CacheException {
+// StringBuffer errors = new StringBuffer();
+ numberOfAfterInvalidates = 0;
+ numberOfAfterCreates = 0;
+ numberOfAfterUpdates = 0;
+ LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
+ for (int i=0; i<numberOfKeys; i++) {
+ String expected = "key-"+i;
+ String actual = (String) region.get("key-"+i);
+ assertEquals(expected, actual);
+ }
+ }
+ };
+
+ getSystem().getLogWriter().info("before initialize client");
+ AsyncInvocation inv2 = vm2.invokeAsync(initializeClient);
+ AsyncInvocation inv3 = vm3.invokeAsync(initializeClient);
+
+ ThreadUtils.join(inv2, 30 * 1000);
+ ThreadUtils.join(inv3, 30 * 1000);
+
+ if (inv2.exceptionOccurred()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm2", inv2.getException());
+ }
+ if(inv3.exceptionOccurred()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm3", inv3.getException());
+ }
+ }
+
+
+ /**
+ * Test for client connection loss with CacheLoader Exception on the server.
+ */
+ public void test004ForCacheLoaderException() throws CacheException, InterruptedException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM server = host.getVM(0);
+ VM client = host.getVM(1);
+
+ // Create the cache servers with distributed, mirrored region
+ SerializableRunnable createServer =
+ new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ CacheLoader cl = new CacheLoader() {
+ public Object load(LoaderHelper helper) {
+ System.out.println("### CALLING CACHE LOADER....");
+ throw new CacheLoaderException("Test for CahceLoaderException causing Client connection to disconnect.");
+ }
+ public void close() {
+ }
+ };
+ AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ };
+ getSystem().getLogWriter().info("before create server");
+
+ server.invoke(createServer);
+
+ // Create cache server clients
+ final int numberOfKeys = 10;
+ final String host0 = NetworkUtils.getServerHostName(host);
+ final int[] port = new int[] {server.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort())};
+ final String poolName = "myPool";
+
+ SerializableRunnable createClient =
+ new CacheSerializableRunnable("Create Cache Server Client") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ // create bridge writer
+ ClientServerTestCase.configureConnectionPoolWithName(factory,host0,port,true,-1, -1, null, poolName);
+ createRegion(name, factory.create());
+ }
+ };
+ getSystem().getLogWriter().info("before create client");
+ client.invoke(createClient);
+
+ // Initialize each client with entries (so that afterInvalidate is called)
+ SerializableRunnable invokeServerCacheLaoder =
+ new CacheSerializableRunnable("Initialize Client") {
+ public void run2() throws CacheException {
+ LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
+ PoolStats stats = ((PoolImpl)PoolManager.find(poolName)).getStats();
+ int oldConnects = stats.getConnects();
+ int oldDisConnects = stats.getDisConnects();
+ try {
+ for (int i=0; i<numberOfKeys; i++) {
+ String actual = (String) region.get("key-"+i);
+ }
+ } catch (Exception ex){
+ if (!(ex.getCause() instanceof CacheLoaderException)) {
+ fail ("UnExpected Exception, expected to receive CacheLoaderException from server, instead found: " + ex.getCause().getClass());
+ }
+ }
+ int newConnects = stats.getConnects();
+ int newDisConnects = stats.getDisConnects();
+ //System.out.println("#### new connects/disconnects :" + newConnects + ":" + newDisConnects);
+ if (newConnects != oldConnects && newDisConnects != oldDisConnects) {
+ fail ("New connection has created for Server side CacheLoaderException.");
+ }
+ }
+ };
+
+ getSystem().getLogWriter().info("before initialize client");
+ AsyncInvocation inv2 = client.invokeAsync(invokeServerCacheLaoder);
+
+ ThreadUtils.join(inv2, 30 * 1000);
+ SerializableRunnable stopServer = new SerializableRunnable("stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ };
+ server.invoke(stopServer);
+
+ }
+
+
+ protected void validateDS() {
+ List l = InternalDistributedSystem.getExistingSystems();
+ if (l.size() > 1) {
+ getSystem().getLogWriter().info("validateDS: size="
+ + l.size()
+ + " isDedicatedAdminVM="
+ + DistributionManager.isDedicatedAdminVM
+ + " l=" + l);
+ }
+ assertFalse(DistributionManager.isDedicatedAdminVM);
+ assertEquals(1, l.size());
+ }
+
+
+ /**
+ * Tests the basic operations of the {@link Pool}
+ *
+ * @since 3.5
+ */
+ public void test006Pool() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = Host.getHost(0).getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setConcurrencyChecksEnabled(false);
+ factory.setCacheLoader(new CacheLoader() {
+ public Object load(LoaderHelper helper) {
+ //System.err.println("CacheServer data loader called");
+ return helper.getKey().toString();
+ }
+ public void close() {
+
+ }
+ });
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ validateDS();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+ vm1.invoke(create);
+
+ vm1.invoke(new CacheSerializableRunnable("Get values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get(new Integer(i));
+ assertEquals(String.valueOf(i), value);
+ }
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Update values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), new Integer(i));
+ }
+ }
+ });
+
+ vm2.invoke(create);
+ vm2.invoke(new CacheSerializableRunnable("Validate values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get(new Integer(i));
+ assertNotNull(value);
+ assertTrue(value instanceof Integer);
+ assertEquals(i, ((Integer) value).intValue());
+ }
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Close Pool") {
+ // do some special close validation here
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ String pName = region.getAttributes().getPoolName();
+ PoolImpl p = (PoolImpl)PoolManager.find(pName);
+ assertEquals(false, p.isDestroyed());
+ assertEquals(1, p.getAttachCount());
+ try {
+ p.destroy();
+ fail("expected IllegalStateException");
+ } catch (IllegalStateException expected) {
+ }
+ region.localDestroyRegion();
+ assertEquals(false, p.isDestroyed());
+ assertEquals(0, p.getAttachCount());
+ }
+ });
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+
+
+
+
+ /**
+ * Tests the BridgeServer failover (bug 31832).
+ */
+ public void test007BridgeServerFailoverCnx1() throws CacheException {
+ disconnectAllFromDS();
+ basicTestBridgeServerFailover(1);
+ }
+ /**
+ * Test BridgeServer failover with connectionsPerServer set to 0
+ */
+ public void test008BridgeServerFailoverCnx0() throws CacheException {
+ basicTestBridgeServerFailover(0);
+ }
+ private void basicTestBridgeServerFailover(final int cnxCount) throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ // Create two bridge servers
+ SerializableRunnable createCacheServer =
+ new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ };
+
+ vm0.invoke(createCacheServer);
+ vm1.invoke(createCacheServer);
+
+ final int port0 =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ final int port1 =
+ vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+// final String host1 = getServerHostName(vm1.getHost());
+
+ // Create one bridge client in this VM
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,true,-1,cnxCount, null, 100);
+
+ Region region = createRegion(name, factory.create());
+
+ // force connections to form
+ region.put("keyInit", new Integer(0));
+ region.put("keyInit2", new Integer(0));
+ }
+ };
+
+ vm2.invoke(create);
+
+ // Launch async thread that puts objects into cache. This thread will execute until
+ // the test has ended (which is why the RegionDestroyedException and CacheClosedException
+ // are caught and ignored. If any other exception occurs, the test will fail. See
+ // the putAI.exceptionOccurred() assertion below.
+ AsyncInvocation putAI = vm2.invokeAsync(new CacheSerializableRunnable("Put objects") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ try {
+ for (int i=0; i<100000; i++) {
+ region.put("keyAI", new Integer(i));
+ try {Thread.sleep(100);} catch (InterruptedException ie) {
+ fail("interrupted");
+ }
+ }
+ } catch (NoAvailableServersException ignore) {
+ /*ignore*/
+ } catch (RegionDestroyedException e) { //will be thrown when the test ends
+ /*ignore*/
+ }
+ catch (CancelException e) { //will be thrown when the test ends
+ /*ignore*/
+ }
+ }
+ });
+
+
+ SerializableRunnable verify1Server =
+ new CacheSerializableRunnable("verify1Server") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ verifyServerCount(pool, 1);
+ }
+ };
+ SerializableRunnable verify2Servers =
+ new CacheSerializableRunnable("verify2Servers") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ verifyServerCount(pool, 2);
+ }
+ };
+
+ vm2.invoke(verify2Servers);
+
+ SerializableRunnable stopCacheServer =
+ new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ };
+
+ final String expected = "java.io.IOException";
+ final String addExpected =
+ "<ExpectedException action=add>" + expected + "</ExpectedException>";
+ final String removeExpected =
+ "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ LogWriter bgexecLogger =
+ new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+ bgexecLogger.info(addExpected);
+ }
+ });
+ try { // make sure we removeExpected
+
+ // Bounce the non-current server (I know that VM1 contains the non-current server
+ // because ...
+ vm1.invoke(stopCacheServer);
+
+ vm2.invoke(verify1Server);
+
+ final int restartPort = port1;
+ vm1.invoke(
+ new SerializableRunnable("Restart CacheServer") {
+ public void run() {
+ try {
+ Region region = getRootRegion().getSubregion(name);
+ assertNotNull(region);
+ startBridgeServer(restartPort);
+ }
+ catch(Exception e) {
+ getSystem().getLogWriter().fine(new Exception(e));
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e);
+ }
+ }
+ });
+
+ // Pause long enough for the monitor to realize the server has been bounced
+ // and reconnect to it.
+ vm2.invoke(verify2Servers);
+
+ } finally {
+ vm2.invoke(new SerializableRunnable() {
+ public void run() {
+ LogWriter bgexecLogger =
+ new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+ bgexecLogger.info(removeExpected);
+ }
+ });
+ }
+
+ // Stop the other cache server
+ vm0.invoke(stopCacheServer);
+
+ // Run awhile
+ vm2.invoke(verify1Server);
+
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("FIXME: this thread does not terminate"); // FIXME
+// // Verify that no exception has occurred in the putter thread
+// DistributedTestCase.join(putAI, 5 * 60 * 1000, getLogWriter());
+// //assertTrue("Exception occurred while invoking " + putAI, !putAI.exceptionOccurred());
+// if (putAI.exceptionOccurred()) {
+// fail("While putting entries: ", putAI.getException());
+// }
+
+ // Close Pool
+ vm2.invoke(new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ });
+
+ // Stop the last cache server
+ vm1.invoke(stopCacheServer);
+ }
+
+
+ /**
+ * Make sure cnx lifetime expiration working on thread local cnxs.
+ * @author darrel
+ */
+ public void test009LifetimeExpireOnTL() throws CacheException {
+ basicTestLifetimeExpire(true);
+ }
+
+ /**
+ * Make sure cnx lifetime expiration working on thread local cnxs.
+ * @author darrel
+ */
+ public void test010LifetimeExpireOnPoolCnx() throws CacheException {
+ basicTestLifetimeExpire(false);
+ }
+
+ protected static volatile boolean stopTestLifetimeExpire = false;
+
+ protected static volatile int baselineLifetimeCheck;
+ protected static volatile int baselineLifetimeExtensions;
+ protected static volatile int baselineLifetimeConnect;
+ protected static volatile int baselineLifetimeDisconnect;
+
+ private void basicTestLifetimeExpire(final boolean threadLocal) throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ AsyncInvocation putAI = null;
+ AsyncInvocation putAI2 = null;
+
+ try {
+
+ // Create two bridge servers
+ SerializableRunnable createCacheServer =
+ new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ factory.setCacheListener(new DelayListener(25));
+ createRegion(name, factory.create());
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ };
+
+ vm0.invoke(createCacheServer);
+
+ final int port0 =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+ vm1.invoke(createCacheServer);
+ final int port1 =
+ vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ SerializableRunnable stopCacheServer =
+ new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ };
+ // we only had to stop it to reserve a port
+ vm1.invoke(stopCacheServer);
+
+
+ // Create one bridge client in this VM
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,false/*queue*/,-1,0, null, 100, 500, threadLocal, 500);
+
+ Region region = createRegion(name, factory.create());
+
+ // force connections to form
+ region.put("keyInit", new Integer(0));
+ region.put("keyInit2", new Integer(0));
+ }
+ };
+
+ vm2.invoke(create);
+
+ // Launch async thread that puts objects into cache. This thread will execute until
+ // the test has ended.
+ SerializableRunnable putter1 =
+ new CacheSerializableRunnable("Put objects") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ PoolStats stats = pool.getStats();
+ baselineLifetimeCheck = stats.getLoadConditioningCheck();
+ baselineLifetimeExtensions = stats.getLoadConditioningExtensions();
+ baselineLifetimeConnect = stats.getLoadConditioningConnect();
+ baselineLifetimeDisconnect = stats.getLoadConditioningDisconnect();
+ try {
+ int count = 0;
+ while (!stopTestLifetimeExpire) {
+ count++;
+ region.put("keyAI1", new Integer(count));
+ }
+ } catch (NoAvailableServersException ex) {
+ if (stopTestLifetimeExpire) {
+ return;
+ } else {
+ throw ex;
+ }
+ // } catch (RegionDestroyedException e) { //will be thrown when the test ends
+ // /*ignore*/
+ // } catch (CancelException e) { //will be thrown when the test ends
+ // /*ignore*/
+ }
+ }
+ };
+ SerializableRunnable putter2 =
+ new CacheSerializableRunnable("Put objects") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ try {
+ int count = 0;
+ while (!stopTestLifetimeExpire) {
+ count++;
+ region.put("keyAI2", new Integer(count));
+ }
+ } catch (NoAvailableServersException ex) {
+ if (stopTestLifetimeExpire) {
+ return;
+ } else {
+ throw ex;
+ }
+ // } catch (RegionDestroyedException e) { //will be thrown when the test ends
+ // /*ignore*/
+ // } catch (CancelException e) { //will be thrown when the test ends
+ // /*ignore*/
+ }
+ }
+ };
+ putAI = vm2.invokeAsync(putter1);
+ putAI2 = vm2.invokeAsync(putter2);
+
+ SerializableRunnable verify1Server =
+ new CacheSerializableRunnable("verify1Server") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ final PoolStats stats = pool.getStats();
+ verifyServerCount(pool, 1);
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck);
+ }
+ public String description() {
+ return null;
+ }
+ };
+ Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+
+ // make sure no replacements are happening.
+ // since we have 2 threads and 2 cnxs and 1 server
+ // when lifetimes are up we should only want to connect back to the
+ // server we are already connected to and thus just extend our lifetime
+ assertTrue("baselineLifetimeCheck=" + baselineLifetimeCheck
+ + " but stats.getLoadConditioningCheck()=" + stats.getLoadConditioningCheck(),
+ stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
+ baselineLifetimeCheck = stats.getLoadConditioningCheck();
+ assertTrue(stats.getLoadConditioningExtensions() > baselineLifetimeExtensions);
+ assertTrue(stats.getLoadConditioningConnect() == baselineLifetimeConnect);
+ assertTrue(stats.getLoadConditioningDisconnect() == baselineLifetimeDisconnect);
+ }
+ };
+ SerializableRunnable verify2Servers =
+ new CacheSerializableRunnable("verify2Servers") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ PoolImpl pool = getPool(region);
+ final PoolStats stats = pool.getStats();
+ verifyServerCount(pool, 2);
+ // make sure some replacements are happening.
+ // since we have 2 threads and 2 cnxs and 2 servers
+ // when lifetimes are up we should connect to the other server sometimes.
+// int retry = 300;
+// while ((retry-- > 0)
+// && (stats.getLoadConditioningCheck() < (10+baselineLifetimeCheck))) {
+// pause(100);
+// }
+// assertTrue("Bug 39209 expected "
+// + stats.getLoadConditioningCheck()
+// + " to be >= "
+// + (10+baselineLifetimeCheck),
+// stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
+
+ // TODO: does this WaitCriterion actually help?
+ WaitCriterion wc = new WaitCriterion() {
+ String excuse;
+ public boolean done() {
+ int actual = stats.getLoadConditioningCheck();
+ int expected = 10 + baselineLifetimeCheck;
+ if (actual >= expected) {
+ return true;
+ }
+ excuse = "Bug 39209 expected " + actual + " to be >= " + expected;
+ return false;
+ }
+ public String description() {
+ return excuse;
+ }
+ };
+ try {
+ Wait.waitForCriterion(wc, 60 * 1000, 1000, true);
+ } catch (AssertionFailedError e) {
+// dumpStack();
+ throw e;
+ }
+
+ assertTrue(stats.getLoadConditioningConnect() > baselineLifetimeConnect);
+ assertTrue(stats.getLoadConditioningDisconnect() > baselineLifetimeDisconnect);
+ }
+ };
+
+ vm2.invoke(verify1Server);
+ assertEquals(true, putAI.isAlive());
+ assertEquals(true, putAI2.isAlive());
+
+ {
+ final int restartPort = port1;
+ vm1.invoke(new SerializableRunnable("Restart CacheServer") {
+ public void run() {
+ try {
+ Region region = getRootRegion().getSubregion(name);
+ assertNotNull(region);
+ startBridgeServer(restartPort);
+ }
+ catch(Exception e) {
+ getSystem().getLogWriter().fine(new Exception(e));
+ com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e);
+ }
+ }
+ });
+ }
+
+ vm2.invoke(verify2Servers);
+ assertEquals(true, putAI.isAlive());
+ assertEquals(true, putAI2.isAlive());
+ } finally {
+ vm2.invoke(new SerializableRunnable("Stop Putters") {
+ public void run() {
+ stopTestLifetimeExpire = true;
+ }
+ });
+
+ try {
+ if (putAI != null) {
+ // Verify that no exception has occurred in the putter thread
+ ThreadUtils.join(putAI, 30 * 1000);
+ if (putAI.exceptionOccurred()) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While putting entries: ", putAI.getException());
+ }
+ }
+
+ if (putAI2 != null) {
+ // Verify that no exception has occurred in the putter thread
+ ThreadUtils.join(putAI, 30 * 1000);
+ // FIXME this thread does not terminate
+// if (putAI2.exceptionOccurred()) {
+// fail("While putting entries: ", putAI.getException());
+// }
+ }
+
+ } finally {
+ vm2.invoke(new SerializableRunnable("Stop Putters") {
+ public void run() {
+ stopTestLifetimeExpire = false;
+ }
+ });
+ // Close Pool
+ vm2.invoke(new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ String poolName = region.getAttributes().getPoolName();
+ region.localDestroyRegion();
+ PoolManager.find(poolName).destroy();
+ }
+ });
+
+ SerializableRunnable stopCacheServer =
+ new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ };
+ vm1.invoke(stopCacheServer);
+ vm0.invoke(stopCacheServer);
+ }
+ }
+ }
+
+ /**
+ * Tests the create operation of the {@link Pool}
+ *
+ * @since 3.5
+ */
+ public void test011PoolCreate() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = Host.getHost(0).getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(create);
+ vm1.invoke(new CacheSerializableRunnable("Create values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.create(new Integer(i), new Integer(i));
+ }
+ }
+ });
+
+ vm2.invoke(create);
+ vm2.invoke(new CacheSerializableRunnable("Validate values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get(new Integer(i));
+ assertNotNull(value);
+ assertTrue(value instanceof Integer);
+ assertEquals(i, ((Integer) value).intValue());
+ }
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+ vm2.invoke(close);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+
+ /**
+ * Tests the put operation of the {@link Pool}
+ *
+ * @since 3.5
+ */
+ public void test012PoolPut() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = Host.getHost(0).getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+ SerializableRunnable createPool =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ // create bridge writer
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(createPool);
+
+ vm1.invoke(new CacheSerializableRunnable("Put values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ // put string values
+ region.put("key-string-"+i, "value-"+i);
+
+ // put object values
+ Order order = new Order();
+ order.init(i);
+ region.put("key-object-"+i, order);
+
+ // put byte[] values
+ region.put("key-bytes-"+i, ("value-"+i).getBytes());
+ }
+ }
+ });
+
+ vm2.invoke(createPool);
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-string-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof String);
+ assertEquals("value-"+i, value);
+ }
+ }
+ });
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-object-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof Order);
+ assertEquals(i, ((Order) value).getIndex());
+ }
+ }
+ });
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-bytes-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof byte[]);
+ assertEquals("value-"+i, new String((byte[]) value));
+ }
+ }
+ });
+
+ SerializableRunnable closePool =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(closePool);
+ vm2.invoke(closePool);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+ /**
+ * Tests the put operation of the {@link Pool}
+ *
+ * @since 3.5
+ */
+ public void test013PoolPutNoDeserialize() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = Host.getHost(0).getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null,null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable createPool =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+ createRegion(name, factory.create());
+ }
+ };
+
+ vm1.invoke(createPool);
+
+ vm1.invoke(new CacheSerializableRunnable("Put values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ // put string values
+ region.put("key-string-"+i, "value-"+i);
+
+ // put object values
+ Order order = new Order();
+ order.init(i);
+ region.put("key-object-"+i, order);
+
+ // put byte[] values
+ region.put("key-bytes-"+i, ("value-"+i).getBytes());
+ }
+ }
+ });
+
+ vm2.invoke(createPool);
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-string-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof String);
+ assertEquals("value-"+i, value);
+ }
+ }
+ });
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-object-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof Order);
+ assertEquals(i, ((Order) value).getIndex());
+ }
+ }
+ });
+
+ vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object value = region.get("key-bytes-"+i);
+ assertNotNull(value);
+ assertTrue(value instanceof byte[]);
+ assertEquals("value-"+i, new String((byte[]) value));
+ }
+ }
+ });
+
+ SerializableRunnable closePool =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(closePool);
+ vm2.invoke(closePool);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ Wait.pause(5 * 1000);
+ }
+
+ /**
+ * Tests that invalidates and destroys are propagated to {@link Pool}s.
+ *
+ * @since 3.5
+ */
+ public void test014InvalidateAndDestroyPropagation() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable create =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+
+ vm1.invoke(create);
+ vm1.invoke(new CacheSerializableRunnable("Populate region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "old" + i);
+ }
+ }
+ });
+ vm2.invoke(create);
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ ctl.enableEventHistory();
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("Update region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "new" + i, "callbackArg" + i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForInvalidated(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNotNull(entry);
+ assertNull(entry.getValue());
+ }
+ {
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals("old" + i, ee.getOldValue());
+ assertEquals(Operation.INVALIDATE, ee.getOperation());
+ assertEquals("callbackArg" + i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ }
+ }
+ });
+
+
+ vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("new" + i, region.getEntry(key).getValue());
+ region.destroy(key, "destroyCB"+i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForDestroyed(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry);
+ }
+ {
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals(Operation.DESTROY, ee.getOperation());
+ assertEquals("destroyCB"+i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ }
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("recreate") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ region.create(key, "create" + i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify creates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ List l = ctl.getEventHistory();
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("history (should be empty): " + l);
+ assertEquals(0, l.size());
+ // now see if we can get it from the server
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("create"+i, region.get(key, "loadCB"+i));
+ }
+ l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("processing " + ee);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals("create"+i, ee.getNewValue());
+ assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
+ assertEquals("loadCB"+i, ee.getCallbackArgument());
+ assertEquals(false, ee.isOriginRemote());
+ }
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+ vm2.invoke(close);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+ /**
+ * Tests that invalidates and destroys are propagated to {@link Pool}s
+ * correctly to DataPolicy.EMPTY + InterestPolicy.ALL
+ *
+ * @since 5.0
+ */
+ public void test015InvalidateAndDestroyToEmptyAllPropagation() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ //pause(1000);
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable createEmpty =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ factory.setDataPolicy(DataPolicy.EMPTY);
+ factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+ SerializableRunnable createNormal =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+
+ vm1.invoke(createEmpty);
+ vm1.invoke(new CacheSerializableRunnable("Populate region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "old" + i);
+ }
+ }
+ });
+
+ vm2.invoke(createNormal);
+ vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ ctl.enableEventHistory();
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("Update region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "new" + i, "callbackArg" + i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForInvalidated(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry); // we are empty!
+ }
+ {
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals(false, ee.isOldValueAvailable()); // failure
+ assertEquals(Operation.INVALIDATE, ee.getOperation());
+ assertEquals("callbackArg" + i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ }
+
+ }
+ });
+
+
+ vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("new" + i, region.getEntry(key).getValue());
+ region.destroy(key, "destroyCB"+i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForDestroyed(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry);
+ }
+ {
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals(false, ee.isOldValueAvailable());
+ assertEquals(Operation.DESTROY, ee.getOperation());
+ assertEquals("destroyCB"+i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ }
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("recreate") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ region.create(key, "create" + i, "createCB"+i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify creates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ ctl.waitForInvalidated(key);
+ Region.Entry entry = region.getEntry(key);
+ assertNull(entry);
+ }
+ List l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals(false, ee.isOldValueAvailable());
+ assertEquals(Operation.INVALIDATE, ee.getOperation());
+ assertEquals("createCB"+i, ee.getCallbackArgument());
+ assertEquals(true, ee.isOriginRemote());
+ }
+ // now see if we can get it from the server
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("create"+i, region.get(key, "loadCB"+i));
+ }
+ l = ctl.getEventHistory();
+ assertEquals(10, l.size());
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ EntryEvent ee = (EntryEvent)l.get(i);
+ assertEquals(key, ee.getKey());
+ assertEquals(null, ee.getOldValue());
+ assertEquals("create"+i, ee.getNewValue());
+ assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
+ assertEquals("loadCB"+i, ee.getCallbackArgument());
+ assertEquals(false, ee.isOriginRemote());
+ }
+ }
+ });
+
+ SerializableRunnable close =
+ new CacheSerializableRunnable("Close Pool") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ region.localDestroyRegion();
+ }
+ };
+
+ vm1.invoke(close);
+ vm2.invoke(close);
+
+ vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+ public void run() {
+ stopBridgeServer(getCache());
+ }
+ });
+ }
+
+ /**
+ * Tests that invalidates and destroys are propagated to {@link Pool}s
+ * correctly to DataPolicy.EMPTY + InterestPolicy.CACHE_CONTENT
+ *
+ * @since 5.0
+ */
+ public void test016InvalidateAndDestroyToEmptyCCPropagation() throws CacheException {
+ final String name = this.getName();
+ final Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+ public void run2() throws CacheException {
+ AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+ createRegion(name, factory.create());
+ try {
+ startBridgeServer(0);
+
+ } catch (Exception ex) {
+ com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+ }
+
+ }
+ });
+ final int port =
+ vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+ final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+ SerializableRunnable createEmpty =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ factory.setDataPolicy(DataPolicy.EMPTY);
+ factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+ SerializableRunnable createNormal =
+ new CacheSerializableRunnable("Create region") {
+ public void run2() throws CacheException {
+ getLonerSystem();
+ getCache();
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setConcurrencyChecksEnabled(false);
+ ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+ CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+ factory.setCacheListener(l);
+ Region rgn = createRegion(name, factory.create());
+ rgn.registerInterestRegex(".*", false, false);
+ }
+ };
+
+ vm1.invoke(createEmpty);
+ vm1.invoke(new CacheSerializableRunnable("Populate region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "old" + i);
+ }
+ }
+ });
+
+ vm2.invoke(createNormal);
+ vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ ctl.enableEventHistory();
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("Update region") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ region.put(new Integer(i), "new" + i, "callbackArg" + i);
+ }
+ }
+ });
+ Wait.pause(5 * 1000);
+
+ vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ List l = ctl.getEventHistory();
+ assertEquals(0, l.size());
+ }
+ });
+
+
+ vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ assertEquals("new" + i, region.getEntry(key).getValue());
+ region.destroy(key, "destroyCB"+i);
+ }
+ }
+ });
+
+ vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+ List l = ctl.getEventHistory();
+ assertEquals(0, l.size());
+ }
+ });
+ vm2.invoke(new CacheSerializableRunnable("recreate") {
+ public void run2() throws CacheException {
+ Region region = getRootRegion().getSubregion(name);
+ for (int i = 0; i < 10; i++) {
+ Object key = new Integer(i);
+ region.create(key, "create" + i, "createCB"+i);
+ }
+ }
+
<TRUNCATED>
[2/2] incubator-geode git commit: GEODE-1111 Connection Pooling needs
more tests
Posted by bs...@apache.org.
GEODE-1111 Connection Pooling needs more tests
These tests were dependent on an Order class that is only available in
Pivotal's old test framework. I've created a small Order class that replaces
it and allows the tests to run as part of Geode's geode-core distributedTest
task.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4ed2fd37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4ed2fd37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4ed2fd37
Branch: refs/heads/develop
Commit: 4ed2fd374cf27ff704b09600eef263b71be9eabc
Parents: ac3d3b4
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Mar 17 15:04:59 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Mar 17 15:10:10 2016 -0700
----------------------------------------------------------------------
.../cache/ConnectionPoolAutoDUnitTest.java | 45 +
.../gemfire/cache/ConnectionPoolDUnitTest.java | 5871 ++++++++++++++++++
2 files changed, 5916 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
new file mode 100755
index 0000000..ad110d7
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
@@ -0,0 +1,45 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache;
+
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+
+import static org.junit.runners.MethodSorters.*;
+import org.junit.FixMethodOrder;
+
+@FixMethodOrder(NAME_ASCENDING)
+public class ConnectionPoolAutoDUnitTest extends ConnectionPoolDUnitTest {
+
+ public ConnectionPoolAutoDUnitTest(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ super.setUp();
+ // TODO Auto-generated method stub
+ ClientServerTestCase.AUTO_LOAD_BALANCE = true;
+ Invoke.invokeInEveryVM(new SerializableRunnable("setupAutoMode") {
+ public void run() {
+ ClientServerTestCase.AUTO_LOAD_BALANCE = true;
+ }
+ });
+ }
+
+ @Override
+ protected final void postTearDownConnectionPoolDUnitTest() throws Exception {
+ ClientServerTestCase.AUTO_LOAD_BALANCE = false;
+ Invoke.invokeInEveryVM(new SerializableRunnable("disableAutoMode") {
+ public void run() {
+ ClientServerTestCase.AUTO_LOAD_BALANCE = false;
+ }
+ });
+ }
+
+}