You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/03/17 23:10:37 UTC

[1/2] incubator-geode git commit: GEODE-1111 Connection Pooling needs more tests

Repository: incubator-geode
Updated Branches:
  refs/heads/develop ac3d3b4c5 -> 4ed2fd374


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
new file mode 100755
index 0000000..41d48aa
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
@@ -0,0 +1,5871 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache;
+
+import static org.junit.runners.MethodSorters.NAME_ASCENDING;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.AssertionFailedError;
+
+import org.junit.FixMethodOrder;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Endpoint;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
+import com.gemstone.gemfire.cache30.TestCacheLoader;
+import com.gemstone.gemfire.cache30.TestCacheWriter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.EntryExpiryTask;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifierStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LocalLogWriter;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.ThreadUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+/**
+ * This class tests the client connection pool in GemFire.
+ * It does so by creating a cache server with a cache and a pre-defined region and
+ * a data loader. The client creates the same region with a pool
+ * (this happens in the controller VM). the client then spins up
+ * 10 different threads and issues gets on keys. The server data loader returns the
+ * data to the client.
+ * Test uses Groboutils TestRunnable objects to achieve multi threading behavior
+ * in the test.
+ *
+ */
+@FixMethodOrder(NAME_ASCENDING)
+public class ConnectionPoolDUnitTest extends CacheTestCase {
+
+  private static final long serialVersionUID = 1L;
+
+  /** The port on which the bridge server was started in this VM */
+  private static int bridgeServerPort;
+
+  protected static int port = 0;
+  protected static int port2 = 0;
+
+  protected static int numberOfAfterInvalidates;
+  protected static int numberOfAfterCreates;
+  protected static int numberOfAfterUpdates;
+
+  protected final static int TYPE_CREATE = 0;
+  protected final static int TYPE_UPDATE = 1;
+  protected final static int TYPE_INVALIDATE = 2;
+  protected final static int TYPE_DESTROY = 3;
+
+  public ConnectionPoolDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    // avoid IllegalStateException from HandShake by connecting all vms to
+    // system before creating pool
+    getSystem();
+    Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+  }
+  
+  @Override
+  protected final void postTearDownCacheTestCase() throws Exception {
+    Invoke.invokeInEveryVM(new SerializableRunnable() {
+      public void run() {
+        Map pools = PoolManager.getAll();
+        if (!pools.isEmpty()) {
+          com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().warning("found pools remaining after teardown: " + pools);
+          assertEquals(0, pools.size());
+        }
+      }
+    });
+    postTearDownConnectionPoolDUnitTest();
+  }
+  
+  protected void postTearDownConnectionPoolDUnitTest() throws Exception {
+  }
+
+  protected/*GemStoneAddition*/ static PoolImpl getPool(Region r) {
+    PoolImpl result = null;
+    String poolName = r.getAttributes().getPoolName();
+    if (poolName != null) {
+      result = (PoolImpl)PoolManager.find(poolName);
+    }
+    return result;
+  }
+  protected static TestCacheWriter getTestWriter(Region r) {
+    return (TestCacheWriter)r.getAttributes().getCacheWriter();
+  }
+  /**
+   * Create a bridge server on the given port without starting it.
+   *
+   * @since 5.0.2
+   */
+  protected void createBridgeServer(int port) throws IOException {
+    CacheServer bridge = getCache().addCacheServer();
+    bridge.setPort(port);
+    bridge.setMaxThreads(getMaxThreads());
+    bridgeServerPort = bridge.getPort();
+  }
+
+  /**
+   * Starts a bridge server on the given port, using the given
+   * deserializeValues and notifyBySubscription to serve up the
+   * given region.
+   *
+   * @since 4.0
+   */
+  protected void startBridgeServer(int port)
+    throws IOException {
+    startBridgeServer(port, -1);
+  }
+  
+  protected void startBridgeServer(int port, int socketBufferSize) throws IOException {
+    startBridgeServer(port, socketBufferSize, CacheServer.DEFAULT_LOAD_POLL_INTERVAL);
+  }
+
+  protected void startBridgeServer(int port, int socketBufferSize, long loadPollInterval)
+    throws IOException {
+
+    Cache cache = getCache();
+    CacheServer bridge = cache.addCacheServer();
+    bridge.setPort(port);
+    if (socketBufferSize != -1) {
+      bridge.setSocketBufferSize(socketBufferSize);
+    }
+    bridge.setMaxThreads(getMaxThreads());
+    bridge.setLoadPollInterval(loadPollInterval);
+    bridge.start();
+    bridgeServerPort = bridge.getPort();
+  }
+
+  /**
+   * By default return 0 which turns off selector and gives thread per cnx.
+   * Test subclasses can override to run with selector.
+   * @since 5.1
+   */
+  protected int getMaxThreads() {
+    return 0;
+  }
+
+  /**
+   * Stops the bridge server that serves up the given cache.
+   *
+   * @since 4.0
+   */
+  void stopBridgeServer(Cache cache) {
+    CacheServer bridge =
+        cache.getCacheServers().iterator().next();
+    bridge.stop();
+    assertFalse(bridge.isRunning());
+  }
+
+  void stopBridgeServers(Cache cache) {
+    CacheServer bridge = null;
+    for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) {
+      bridge = (CacheServer) bsI.next();
+      bridge.stop();
+      assertFalse(bridge.isRunning());
+    }
+  }
+
+  private void restartBridgeServers(Cache cache) throws IOException
+  {
+    CacheServer bridge = null;
+    for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) {
+      bridge = (CacheServer) bsI.next();
+      bridge.start();
+      assertTrue(bridge.isRunning());
+    }
+  }
+
+  protected InternalDistributedSystem createLonerDS() {
+    disconnectFromDS();
+    InternalDistributedSystem ds = getLonerSystem();
+    assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size());
+    return ds;
+  }
+
+  
+  
+  /**
+   * Returns region attributes for a <code>LOCAL</code> region
+   */
+  protected RegionAttributes getRegionAttributes() {
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+    factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+    return factory.create();
+  }
+
+  private static String createBridgeClientConnection(String host, int[] ports) {
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < ports.length; i++) {
+      if (i > 0) sb.append(",");
+      sb.append("name" + i + "=");
+      sb.append(host + ":" + ports[i]);
+    }
+    return sb.toString();
+  }
+
+  private class EventWrapper {
+    public final EntryEvent event;
+    public final Object key;
+    public final Object val;
+    public final Object arg;
+    public final  int type;
+    public EventWrapper(EntryEvent ee, int type) {
+      this.event = ee;
+      this.key = ee.getKey();
+      this.val = ee.getNewValue();
+      this.arg = ee.getCallbackArgument();
+      this.type = type;
+    }
+    public String toString() {
+      return "EventWrapper: event=" + event + ", type=" + type;
+    }
+  }
+
+  protected class ControlListener extends CacheListenerAdapter {
+    public final LinkedList events = new LinkedList();
+    public final Object CONTROL_LOCK = new Object();
+
+    public boolean waitWhileNotEnoughEvents(long sleepMs, int eventCount) {
+      long maxMillis = System.currentTimeMillis() + sleepMs;
+      synchronized (this.CONTROL_LOCK) {
+        try {
+          while (this.events.size() < eventCount) {
+            long waitMillis = maxMillis - System.currentTimeMillis();
+            if (waitMillis < 10) {
+              break;
+            }
+            this.CONTROL_LOCK.wait(waitMillis);
+          }
+        } catch (InterruptedException abort) {
+          fail("interrupted");
+        }
+        return !this.events.isEmpty();
+      }
+    }
+
+    public void afterCreate(EntryEvent e) {
+      //System.out.println("afterCreate: " + e);
+      synchronized(this.CONTROL_LOCK) {
+        this.events.add(new EventWrapper(e, TYPE_CREATE));
+        this.CONTROL_LOCK.notifyAll();
+      }
+    }
+
+    public void afterUpdate(EntryEvent e) {
+      //System.out.println("afterUpdate: " + e);
+      synchronized(this.CONTROL_LOCK) {
+        this.events.add(new EventWrapper(e, TYPE_UPDATE));
+        this.CONTROL_LOCK.notifyAll();
+      }
+    }
+
+    public void afterInvalidate(EntryEvent e) {
+      //System.out.println("afterInvalidate: " + e);
+      synchronized(this.CONTROL_LOCK) {
+        this.events.add(new EventWrapper(e, TYPE_INVALIDATE));
+        this.CONTROL_LOCK.notifyAll();
+      }
+    }
+
+    public void afterDestroy(EntryEvent e) {
+      //System.out.println("afterDestroy: " + e);
+      synchronized(this.CONTROL_LOCK) {
+        this.events.add(new EventWrapper(e, TYPE_DESTROY));
+        this.CONTROL_LOCK.notifyAll();
+      }
+    }
+  }
+
+  
+
+
+  /**
+   * Create a fake EntryEvent that returns the provided region for {@link CacheEvent#getRegion()}
+   * and returns {@link com.gemstone.gemfire.cache.Operation#LOCAL_LOAD_CREATE} for {@link CacheEvent#getOperation()}
+   * @param r
+   * @return fake entry event
+   */
+  protected static EntryEvent createFakeyEntryEvent(final Region r) {
+    return new EntryEvent() {
+      public Operation getOperation()
+      {
+        return Operation.LOCAL_LOAD_CREATE; // fake out pool to exit early
+      }
+      public Region getRegion()
+      {
+        return r;
+      }
+      public Object getKey() { return null; }
+      public Object getOldValue() { return null;}
+      public boolean isOldValueAvailable() {return true;}
+      public Object getNewValue() { return null;}
+      public boolean isLocalLoad() { return false;}
+      public boolean isNetLoad() {return false;}
+      public boolean isLoad() {return true; }
+      public boolean isNetSearch() {return false;}
+      public TransactionId getTransactionId() {return null;}
+      public Object getCallbackArgument() {return null;}
+      public boolean isCallbackArgumentAvailable() {return true;}
+      public boolean isOriginRemote() {return false;}
+      public DistributedMember getDistributedMember() {return null;}
+      public boolean isExpiration() { return false;}
+      public boolean isDistributed() { return false;}
+      public boolean isBridgeEvent() {
+        return hasClientOrigin();
+      }
+      public boolean hasClientOrigin() {
+        return false;
+      }
+      public ClientProxyMembershipID getContext() {
+        return null;
+      }
+      public SerializedCacheValue getSerializedOldValue() {return null;}
+      public SerializedCacheValue getSerializedNewValue() {return null;}
+    };
+  }
+
+  public void verifyBalanced(final PoolImpl pool, int expectedServer, 
+      final int expectedConsPerServer) {
+    verifyServerCount(pool, expectedServer);
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return balanced(pool, expectedConsPerServer);
+      }
+      public String description() {
+        return "expected " + expectedConsPerServer
+            + " but endpoints=" + outOfBalanceReport(pool);
+      }
+    };
+    Wait.waitForCriterion(ev, 2 * 60 * 1000, 200, true);
+    assertEquals("expected " + expectedConsPerServer
+                 + " but endpoints=" + outOfBalanceReport(pool),
+                 true, balanced(pool, expectedConsPerServer));
+  }
+  protected boolean balanced(PoolImpl pool, int expectedConsPerServer) {
+    Iterator it = pool.getEndpointMap().values().iterator();
+    while (it.hasNext()) {
+      Endpoint ep = (Endpoint)it.next();
+      if (ep.getStats().getConnections() != expectedConsPerServer) {
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  protected String outOfBalanceReport(PoolImpl pool) {
+    StringBuffer result = new StringBuffer();
+    Iterator it = pool.getEndpointMap().values().iterator();
+    result.append("<");
+    while (it.hasNext()) {
+      Endpoint ep = (Endpoint)it.next();
+      result.append("ep=" + ep);
+      result.append(" conCount=" + ep.getStats().getConnections());
+      if (it.hasNext()) {
+        result.append(", ");
+      }
+    }
+    result.append(">");
+    return result.toString();
+  }
+  
+  public void waitForBlacklistToClear(final PoolImpl pool) {
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return pool.getBlacklistedServers().size() == 0;
+      }
+      public String description() {
+        return null;
+      }
+    };
+    Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+    assertEquals("unexpected blacklistedServers=" + pool.getBlacklistedServers(),
+                 0, pool.getBlacklistedServers().size());
+  }
+  
+  public void verifyServerCount(final PoolImpl pool, final int expectedCount) {
+    getCache().getLogger().info("verifyServerCount expects=" + expectedCount);
+    WaitCriterion ev = new WaitCriterion() {
+      String excuse;
+      public boolean done() {
+        int actual = pool.getConnectedServerCount();
+        if (actual == expectedCount) {
+          return true;
+        }
+        excuse = "Found only " + actual + " servers, expected " + expectedCount;
+        return false;
+      }
+      public String description() {
+        return excuse;
+      }
+    };
+    Wait.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
+  }
+
+  /**
+   * Tests that the callback argument is sent to the server
+   */
+  public void test001CallbackArg() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    final Object createCallbackArg = "CREATE CALLBACK ARG";
+    final Object updateCallbackArg = "PUT CALLBACK ARG";
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+
+          CacheWriter cw = new TestCacheWriter() {
+              public final void beforeUpdate2(EntryEvent event)
+                throws CacheWriterException {
+                Object beca = event.getCallbackArgument();
+                assertEquals(updateCallbackArg, beca);
+              }
+
+              public void beforeCreate2(EntryEvent event)
+                throws CacheWriterException {
+                Object beca =  event.getCallbackArgument();
+                assertEquals(createCallbackArg, beca);
+              }
+            };
+            AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+
+            ClientServerTestCase.configureConnectionPool(factory,NetworkUtils.getServerHostName(host),port,-1,true,-1,-1, null);
+            createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(create);
+    vm1.invoke(new CacheSerializableRunnable("Add entries") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.create(new Integer(i), "old" + i, createCallbackArg);
+          }
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "new" + i, updateCallbackArg);
+        }
+        }
+      });
+
+    vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          TestCacheWriter writer = getTestWriter(region);
+          assertTrue(writer.wasInvoked());
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          region.localDestroyRegion();
+        }
+    };
+
+    vm1.invoke(close);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+
+  }
+
+  /**
+   * Tests that consecutive puts have the callback assigned
+   * appropriately.
+   */
+  public void test002CallbackArg2() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    final Object createCallbackArg = "CREATE CALLBACK ARG";
+//    final Object updateCallbackArg = "PUT CALLBACK ARG";
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          CacheWriter cw = new TestCacheWriter() {
+              public void beforeCreate2(EntryEvent event)
+                throws CacheWriterException {
+                Integer key = (Integer) event.getKey();
+                if (key.intValue() % 2 == 0) {
+                  Object beca =  event.getCallbackArgument();
+                  assertEquals(createCallbackArg, beca);
+                } else {
+                  Object beca =  event.getCallbackArgument();
+                  assertNull(beca);
+                }
+              }
+            };
+            AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(create);
+    vm1.invoke(new CacheSerializableRunnable("Add entries") {
+        public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            if (i % 2 == 0) {
+              region.create(new Integer(i), "old" + i, createCallbackArg);
+
+            } else {
+              region.create(new Integer(i), "old" + i);
+            }
+          }
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(close);
+
+    vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          TestCacheWriter writer = getTestWriter(region);
+          assertTrue(writer.wasInvoked());
+        }
+      });
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+
+  
+  /**
+   * Tests for bug 36684 by having two bridge servers with cacheloaders that should always return
+   * a value and one client connected to each server reading values. If the bug exists, the
+   * clients will get null sometimes. 
+   * @throws InterruptedException 
+   */
+  public void test003Bug36684() throws CacheException, InterruptedException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Create the cache servers with distributed, mirrored region
+    SerializableRunnable createServer =
+      new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          CacheLoader cl = new CacheLoader() {
+            public Object load(LoaderHelper helper) {
+              return helper.getKey();
+            }
+            public void close() {
+
+            }
+          };
+          AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      };
+    getSystem().getLogWriter().info("before create server");
+    vm0.invoke(createServer);
+    vm1.invoke(createServer);
+
+    // Create cache server clients
+    final int numberOfKeys = 1000;
+    final String host0 = NetworkUtils.getServerHostName(host);
+    final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    SerializableRunnable createClient =
+      new CacheSerializableRunnable("Create Cache Server Client") {
+        public void run2() throws CacheException {
+          // reset all static listener variables in case this is being rerun in a subclass
+          numberOfAfterInvalidates = 0;
+          numberOfAfterCreates  = 0;
+          numberOfAfterUpdates = 0;
+          // create the region
+          getLonerSystem();
+          AttributesFactory factory = new AttributesFactory();
+          factory.setScope(Scope.LOCAL);
+          factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+          // create bridge writer
+          ClientServerTestCase.configureConnectionPool(factory,host0,vm0Port,vm1Port,true,-1,-1, null);
+         createRegion(name, factory.create());
+        }
+      };
+    getSystem().getLogWriter().info("before create client");
+    vm2.invoke(createClient);
+    vm3.invoke(createClient);
+
+    // Initialize each client with entries (so that afterInvalidate is called)
+    SerializableRunnable initializeClient =
+      new CacheSerializableRunnable("Initialize Client") {
+      public void run2() throws CacheException {
+//        StringBuffer errors = new StringBuffer();
+          numberOfAfterInvalidates = 0;
+          numberOfAfterCreates = 0;
+          numberOfAfterUpdates = 0;
+          LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
+          for (int i=0; i<numberOfKeys; i++) {
+            String expected = "key-"+i;
+            String actual = (String) region.get("key-"+i);
+            assertEquals(expected, actual);
+          }
+      }
+    };
+    
+    getSystem().getLogWriter().info("before initialize client");
+    AsyncInvocation inv2 = vm2.invokeAsync(initializeClient);
+    AsyncInvocation inv3 = vm3.invokeAsync(initializeClient);
+    
+    ThreadUtils.join(inv2, 30 * 1000);
+    ThreadUtils.join(inv3, 30 * 1000);
+    
+    if (inv2.exceptionOccurred()) { 
+      com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm2", inv2.getException());
+    }
+    if(inv3.exceptionOccurred()) {
+      com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm3", inv3.getException());
+    }
+  }
+  
+
+  /**
+   * Test for client connection loss with CacheLoader Exception on the server.
+   */
+  public void test004ForCacheLoaderException() throws CacheException, InterruptedException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+
+    // Create the cache servers with distributed, mirrored region
+    SerializableRunnable createServer =
+      new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          CacheLoader cl = new CacheLoader() {
+            public Object load(LoaderHelper helper) {
+              System.out.println("### CALLING CACHE LOADER....");
+              throw new CacheLoaderException("Test for CahceLoaderException causing Client connection to disconnect.");
+            }
+            public void close() {
+            }
+          };
+          AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      };
+    getSystem().getLogWriter().info("before create server");
+
+    server.invoke(createServer);
+
+    // Create cache server clients
+    final int numberOfKeys = 10;
+    final String host0 = NetworkUtils.getServerHostName(host);
+    final int[] port = new int[] {server.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort())};
+    final String poolName = "myPool";
+
+    SerializableRunnable createClient =
+      new CacheSerializableRunnable("Create Cache Server Client") {
+        public void run2() throws CacheException {
+          getLonerSystem();
+          AttributesFactory factory = new AttributesFactory();
+          factory.setScope(Scope.LOCAL);
+          factory.setConcurrencyChecksEnabled(false);
+          // create bridge writer
+          ClientServerTestCase.configureConnectionPoolWithName(factory,host0,port,true,-1, -1, null, poolName);
+         createRegion(name, factory.create());
+        }
+      };
+    getSystem().getLogWriter().info("before create client");
+    client.invoke(createClient);
+
+    // Initialize each client with entries (so that afterInvalidate is called)
+    SerializableRunnable invokeServerCacheLaoder =
+      new CacheSerializableRunnable("Initialize Client") {
+      public void run2() throws CacheException {
+          LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
+          PoolStats stats = ((PoolImpl)PoolManager.find(poolName)).getStats();
+          int oldConnects = stats.getConnects();
+          int oldDisConnects = stats.getDisConnects();
+          try {
+          for (int i=0; i<numberOfKeys; i++) {
+            String actual = (String) region.get("key-"+i);
+          }
+          } catch (Exception ex){
+            if (!(ex.getCause() instanceof CacheLoaderException)) {
+              fail ("UnExpected Exception, expected to receive CacheLoaderException from server, instead found: " + ex.getCause().getClass());
+            }
+          }
+          int newConnects = stats.getConnects();
+          int newDisConnects = stats.getDisConnects();
+          //System.out.println("#### new connects/disconnects :" + newConnects + ":" + newDisConnects);
+          if (newConnects != oldConnects && newDisConnects != oldDisConnects) {
+            fail ("New connection has created for Server side CacheLoaderException.");
+          }
+      }
+    };
+
+    getSystem().getLogWriter().info("before initialize client");
+    AsyncInvocation inv2 = client.invokeAsync(invokeServerCacheLaoder);
+    
+    ThreadUtils.join(inv2, 30 * 1000);
+    SerializableRunnable stopServer = new SerializableRunnable("stop CacheServer") {
+      public void run() {
+        stopBridgeServer(getCache());
+      }
+    };
+    server.invoke(stopServer);
+    
+  }
+
+
+  protected void validateDS() {
+    List l = InternalDistributedSystem.getExistingSystems();
+    if (l.size() > 1) {
+      getSystem().getLogWriter().info("validateDS: size="
+                                      + l.size()
+                                      + " isDedicatedAdminVM="
+                                      + DistributionManager.isDedicatedAdminVM
+                                      + " l=" + l);
+    }
+    assertFalse(DistributionManager.isDedicatedAdminVM);
+    assertEquals(1, l.size());
+  }
+  
+
+  /**
+   * Tests the basic operations of the {@link Pool}
+   *
+   * @since 3.5
+   */
+  public void test006Pool() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = Host.getHost(0).getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = new AttributesFactory();
+          factory.setScope(Scope.DISTRIBUTED_ACK);
+          factory.setConcurrencyChecksEnabled(false);
+          factory.setCacheLoader(new CacheLoader() {
+              public Object load(LoaderHelper helper) {
+                //System.err.println("CacheServer data loader called");
+                return helper.getKey().toString();
+              }
+              public void close() {
+
+              }
+            });
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            validateDS();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            createRegion(name, factory.create());
+          }
+        };
+    vm1.invoke(create);
+
+    vm1.invoke(new CacheSerializableRunnable("Get values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get(new Integer(i));
+            assertEquals(String.valueOf(i), value);
+          }
+        }
+      });
+
+    vm1.invoke(new CacheSerializableRunnable("Update values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), new Integer(i));
+          }
+        }
+      });
+
+    vm2.invoke(create);
+    vm2.invoke(new CacheSerializableRunnable("Validate values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get(new Integer(i));
+            assertNotNull(value);
+            assertTrue(value instanceof Integer);
+            assertEquals(i, ((Integer) value).intValue());
+          }
+        }
+      });
+
+    vm1.invoke(new CacheSerializableRunnable("Close Pool") {
+        // do some special close validation here
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          String pName = region.getAttributes().getPoolName();
+          PoolImpl p = (PoolImpl)PoolManager.find(pName);
+          assertEquals(false, p.isDestroyed());
+          assertEquals(1, p.getAttachCount());
+          try {
+            p.destroy();
+            fail("expected IllegalStateException");
+          } catch (IllegalStateException expected) {
+          }
+          region.localDestroyRegion();
+          assertEquals(false, p.isDestroyed());
+          assertEquals(0, p.getAttachCount());
+        }
+      });
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+  
+  
+    
+  
+  /**
+   * Tests the BridgeServer failover (bug 31832).
+   */
+  public void test007BridgeServerFailoverCnx1() throws CacheException {
+    disconnectAllFromDS();
+    basicTestBridgeServerFailover(1);
+  }
+  /**
+   * Test BridgeServer failover with connectionsPerServer set to 0
+   */
+  public void test008BridgeServerFailoverCnx0() throws CacheException {
+    basicTestBridgeServerFailover(0);
+  }
+  private void basicTestBridgeServerFailover(final int cnxCount) throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    // Create two bridge servers
+    SerializableRunnable createCacheServer =
+      new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+        }
+
+        }
+      };
+
+    vm0.invoke(createCacheServer);
+    vm1.invoke(createCacheServer);
+
+    final int port0 =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    final int port1 =
+      vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+//    final String host1 = getServerHostName(vm1.getHost());
+
+    // Create one bridge client in this VM
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+        public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,true,-1,cnxCount, null, 100);
+
+            Region region = createRegion(name, factory.create());
+
+            // force connections to form
+            region.put("keyInit", new Integer(0));
+            region.put("keyInit2", new Integer(0));
+        }
+        };
+
+    vm2.invoke(create);
+
+    // Launch async thread that puts objects into cache. This thread will execute until
+    // the test has ended (which is why the RegionDestroyedException and CacheClosedException
+    // are caught and ignored. If any other exception occurs, the test will fail. See
+    // the putAI.exceptionOccurred() assertion below.
+    AsyncInvocation putAI = vm2.invokeAsync(new CacheSerializableRunnable("Put objects") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          try {
+            for (int i=0; i<100000; i++) {
+              region.put("keyAI", new Integer(i));
+              try {Thread.sleep(100);} catch (InterruptedException ie) {
+                fail("interrupted");
+              }
+            }
+          } catch (NoAvailableServersException ignore) {
+            /*ignore*/
+          } catch (RegionDestroyedException e) { //will be thrown when the test ends
+            /*ignore*/
+          } 
+          catch (CancelException e) { //will be thrown when the test ends
+            /*ignore*/
+          }
+        }
+      });
+
+
+    SerializableRunnable verify1Server =
+      new CacheSerializableRunnable("verify1Server") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          PoolImpl pool = getPool(region);
+          verifyServerCount(pool, 1);
+        }
+        };
+    SerializableRunnable verify2Servers =
+      new CacheSerializableRunnable("verify2Servers") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          PoolImpl pool = getPool(region);
+          verifyServerCount(pool, 2);
+        }
+      };
+
+    vm2.invoke(verify2Servers);
+
+    SerializableRunnable stopCacheServer =
+      new SerializableRunnable("Stop CacheServer") {
+          public void run() {
+            stopBridgeServer(getCache());
+          }
+      };
+
+    final String expected = "java.io.IOException";
+    final String addExpected =
+      "<ExpectedException action=add>" + expected + "</ExpectedException>";
+    final String removeExpected =
+      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
+    vm2.invoke(new SerializableRunnable() {
+      public void run() {
+        LogWriter bgexecLogger =
+              new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+        bgexecLogger.info(addExpected);
+      }
+    });
+    try { // make sure we removeExpected
+
+    // Bounce the non-current server (I know that VM1 contains the non-current server
+    // because ...
+    vm1.invoke(stopCacheServer);
+
+    vm2.invoke(verify1Server);
+
+    final int restartPort = port1;
+    vm1.invoke(
+      new SerializableRunnable("Restart CacheServer") {
+      public void run() {
+        try {
+          Region region = getRootRegion().getSubregion(name);
+          assertNotNull(region);
+          startBridgeServer(restartPort);
+        }
+        catch(Exception e) {
+          getSystem().getLogWriter().fine(new Exception(e));
+          com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e);
+        }
+      }
+    });
+
+    // Pause long enough for the monitor to realize the server has been bounced
+    // and reconnect to it.
+    vm2.invoke(verify2Servers);
+
+    } finally {
+      vm2.invoke(new SerializableRunnable() {
+        public void run() {
+          LogWriter bgexecLogger =
+                new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+          bgexecLogger.info(removeExpected);
+        }
+      });
+    }
+
+    // Stop the other cache server
+    vm0.invoke(stopCacheServer);
+
+    // Run awhile
+    vm2.invoke(verify1Server);
+
+    com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("FIXME: this thread does not terminate"); // FIXME
+//    // Verify that no exception has occurred in the putter thread
+//    DistributedTestCase.join(putAI, 5 * 60 * 1000, getLogWriter());
+//    //assertTrue("Exception occurred while invoking " + putAI, !putAI.exceptionOccurred());
+//    if (putAI.exceptionOccurred()) {
+//      fail("While putting entries: ", putAI.getException());
+//    }
+
+    // Close Pool
+    vm2.invoke(new CacheSerializableRunnable("Close Pool") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          region.localDestroyRegion();
+        }
+      });
+
+    // Stop the last cache server
+    vm1.invoke(stopCacheServer);
+  }
+  
+
+  /**
+   * Make sure cnx lifetime expiration working on thread local cnxs.
+   * @author darrel
+   */
+  public void test009LifetimeExpireOnTL() throws CacheException {
+    basicTestLifetimeExpire(true);
+  }
+
+  /**
+   * Make sure cnx lifetime expiration working on thread local cnxs.
+   * @author darrel
+   */
+  public void test010LifetimeExpireOnPoolCnx() throws CacheException {
+    basicTestLifetimeExpire(false);
+  }
+
+  protected static volatile boolean stopTestLifetimeExpire = false;
+
+  protected static volatile int baselineLifetimeCheck;
+  protected static volatile int baselineLifetimeExtensions;
+  protected static volatile int baselineLifetimeConnect;
+  protected static volatile int baselineLifetimeDisconnect;
+
+  private void basicTestLifetimeExpire(final boolean threadLocal) throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    AsyncInvocation putAI = null;
+    AsyncInvocation putAI2 = null;
+
+    try {
+
+      // Create two bridge servers
+      SerializableRunnable createCacheServer =
+        new CacheSerializableRunnable("Create Cache Server") {
+          public void run2() throws CacheException {
+            AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+            factory.setCacheListener(new DelayListener(25));
+            createRegion(name, factory.create());
+            try {
+              startBridgeServer(0);
+
+            } catch (Exception ex) {
+              com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+            }
+
+          }
+        };
+
+      vm0.invoke(createCacheServer);
+
+      final int port0 =
+        vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+      final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+      vm1.invoke(createCacheServer);
+      final int port1 =
+        vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+      SerializableRunnable stopCacheServer =
+        new SerializableRunnable("Stop CacheServer") {
+          public void run() {
+            stopBridgeServer(getCache());
+          }
+        };
+      // we only had to stop it to reserve a port
+      vm1.invoke(stopCacheServer);
+
+
+      // Create one bridge client in this VM
+      SerializableRunnable create =
+        new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,false/*queue*/,-1,0, null, 100, 500, threadLocal, 500);
+
+            Region region = createRegion(name, factory.create());
+
+            // force connections to form
+            region.put("keyInit", new Integer(0));
+            region.put("keyInit2", new Integer(0));
+          }
+        };
+
+      vm2.invoke(create);
+
+      // Launch async thread that puts objects into cache. This thread will execute until
+      // the test has ended.
+      SerializableRunnable putter1 = 
+        new CacheSerializableRunnable("Put objects") {
+          public void run2() throws CacheException {
+            Region region = getRootRegion().getSubregion(name);
+            PoolImpl pool = getPool(region);
+            PoolStats stats = pool.getStats();
+            baselineLifetimeCheck = stats.getLoadConditioningCheck();
+            baselineLifetimeExtensions = stats.getLoadConditioningExtensions();
+            baselineLifetimeConnect = stats.getLoadConditioningConnect();
+            baselineLifetimeDisconnect = stats.getLoadConditioningDisconnect();
+            try {
+              int count = 0;
+              while (!stopTestLifetimeExpire) {
+                count++;
+                region.put("keyAI1", new Integer(count));
+              }
+            } catch (NoAvailableServersException ex) {
+              if (stopTestLifetimeExpire) {
+                return;
+              } else {
+                throw ex;
+              }
+              //           } catch (RegionDestroyedException e) { //will be thrown when the test ends
+              //             /*ignore*/
+              //           } catch (CancelException e) { //will be thrown when the test ends
+              //             /*ignore*/
+            }
+          }
+        };
+      SerializableRunnable putter2 = 
+        new CacheSerializableRunnable("Put objects") {
+          public void run2() throws CacheException {
+            Region region = getRootRegion().getSubregion(name);
+            try {
+              int count = 0;
+              while (!stopTestLifetimeExpire) {
+                count++;
+                region.put("keyAI2", new Integer(count));
+              }
+            } catch (NoAvailableServersException ex) {
+              if (stopTestLifetimeExpire) {
+                return;
+              } else {
+                throw ex;
+              }
+              //           } catch (RegionDestroyedException e) { //will be thrown when the test ends
+              //             /*ignore*/
+              //           } catch (CancelException e) { //will be thrown when the test ends
+              //             /*ignore*/
+            }
+          }
+        };
+      putAI = vm2.invokeAsync(putter1);
+      putAI2 = vm2.invokeAsync(putter2);
+
+      SerializableRunnable verify1Server =
+        new CacheSerializableRunnable("verify1Server") {
+          public void run2() throws CacheException {
+            Region region = getRootRegion().getSubregion(name);
+            PoolImpl pool = getPool(region);
+            final PoolStats stats = pool.getStats();
+            verifyServerCount(pool, 1);
+            WaitCriterion ev = new WaitCriterion() {
+              public boolean done() {
+                return stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck);
+              }
+              public String description() {
+                return null;
+              }
+            };
+            Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+            
+            // make sure no replacements are happening.
+            // since we have 2 threads and 2 cnxs and 1 server
+            // when lifetimes are up we should only want to connect back to the
+            // server we are already connected to and thus just extend our lifetime
+            assertTrue("baselineLifetimeCheck=" + baselineLifetimeCheck
+                       + " but stats.getLoadConditioningCheck()=" + stats.getLoadConditioningCheck(),
+                       stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
+            baselineLifetimeCheck = stats.getLoadConditioningCheck();
+            assertTrue(stats.getLoadConditioningExtensions() > baselineLifetimeExtensions);
+            assertTrue(stats.getLoadConditioningConnect() == baselineLifetimeConnect);
+            assertTrue(stats.getLoadConditioningDisconnect() == baselineLifetimeDisconnect);
+          }
+        };
+      SerializableRunnable verify2Servers =
+        new CacheSerializableRunnable("verify2Servers") {
+          public void run2() throws CacheException {
+            Region region = getRootRegion().getSubregion(name);
+            PoolImpl pool = getPool(region);
+            final PoolStats stats = pool.getStats();
+            verifyServerCount(pool, 2);
+            // make sure some replacements are happening.
+            // since we have 2 threads and 2 cnxs and 2 servers
+            // when lifetimes are up we should connect to the other server sometimes.
+//            int retry = 300;
+//            while ((retry-- > 0)
+//                   && (stats.getLoadConditioningCheck() < (10+baselineLifetimeCheck))) {
+//              pause(100);
+//            }
+//            assertTrue("Bug 39209 expected "
+//                       + stats.getLoadConditioningCheck()
+//                       + " to be >= "
+//                       + (10+baselineLifetimeCheck),
+//                       stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
+            
+            // TODO: does this WaitCriterion actually help?
+            WaitCriterion wc = new WaitCriterion() {
+              String excuse;
+              public boolean done() {
+                int actual = stats.getLoadConditioningCheck();
+                int expected = 10 + baselineLifetimeCheck;
+                if (actual >= expected) {
+                  return true;
+                }
+                excuse = "Bug 39209 expected " + actual + " to be >= " + expected;
+                return false;
+              }
+              public String description() {
+                return excuse;
+              }
+            };
+            try {
+              Wait.waitForCriterion(wc, 60 * 1000, 1000, true);
+            } catch (AssertionFailedError e) {
+//              dumpStack();
+              throw e;
+            }
+            
+            assertTrue(stats.getLoadConditioningConnect() > baselineLifetimeConnect);
+            assertTrue(stats.getLoadConditioningDisconnect() > baselineLifetimeDisconnect);
+          }
+        };
+
+      vm2.invoke(verify1Server);
+      assertEquals(true, putAI.isAlive());
+      assertEquals(true, putAI2.isAlive());
+
+      {
+        final int restartPort = port1;
+        vm1.invoke(new SerializableRunnable("Restart CacheServer") {
+            public void run() {
+              try {
+                Region region = getRootRegion().getSubregion(name);
+                assertNotNull(region);
+                startBridgeServer(restartPort);
+              }
+              catch(Exception e) {
+                getSystem().getLogWriter().fine(new Exception(e));
+                com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e);
+              }
+            }
+          });
+      }
+
+      vm2.invoke(verify2Servers);
+      assertEquals(true, putAI.isAlive());
+      assertEquals(true, putAI2.isAlive());
+    } finally {
+      vm2.invoke(new SerializableRunnable("Stop Putters") {
+          public void run() {
+            stopTestLifetimeExpire = true;
+          }
+        });
+
+      try {
+        if (putAI != null) {
+          // Verify that no exception has occurred in the putter thread
+          ThreadUtils.join(putAI, 30 * 1000);
+          if (putAI.exceptionOccurred()) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While putting entries: ", putAI.getException());
+          }
+        }
+        
+        if (putAI2 != null) {
+          // Verify that no exception has occurred in the putter thread
+          ThreadUtils.join(putAI, 30 * 1000);
+          // FIXME this thread does not terminate
+//          if (putAI2.exceptionOccurred()) {
+//            fail("While putting entries: ", putAI.getException());
+//          }
+        }
+
+      } finally {
+        vm2.invoke(new SerializableRunnable("Stop Putters") {
+            public void run() {
+              stopTestLifetimeExpire = false;
+            }
+          });
+        // Close Pool
+        vm2.invoke(new CacheSerializableRunnable("Close Pool") {
+            public void run2() throws CacheException {
+              Region region = getRootRegion().getSubregion(name);
+              String poolName = region.getAttributes().getPoolName();
+              region.localDestroyRegion();
+              PoolManager.find(poolName).destroy();
+            }
+          });
+
+        SerializableRunnable stopCacheServer =
+          new SerializableRunnable("Stop CacheServer") {
+            public void run() {
+              stopBridgeServer(getCache());
+            }
+          };
+        vm1.invoke(stopCacheServer);
+        vm0.invoke(stopCacheServer);
+      }
+    }
+  }
+
+  /**
+   * Tests the create operation of the {@link Pool}
+   *
+   * @since 3.5
+   */
+  public void test011PoolCreate() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = Host.getHost(0).getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+           createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(create);
+    vm1.invoke(new CacheSerializableRunnable("Create values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.create(new Integer(i), new Integer(i));
+          }
+        }
+      });
+
+    vm2.invoke(create);
+    vm2.invoke(new CacheSerializableRunnable("Validate values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get(new Integer(i));
+            assertNotNull(value);
+            assertTrue(value instanceof Integer);
+            assertEquals(i, ((Integer) value).intValue());
+          }
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(close);
+    vm2.invoke(close);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+
+  /**
+   * Tests the put operation of the {@link Pool}
+   *
+   * @since 3.5
+   */
+  public void test012PoolPut() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = Host.getHost(0).getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+    SerializableRunnable createPool =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            // create bridge writer
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+           createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(createPool);
+
+    vm1.invoke(new CacheSerializableRunnable("Put values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            // put string values
+            region.put("key-string-"+i, "value-"+i);
+
+            // put object values
+            Order order = new Order();
+            order.init(i);
+            region.put("key-object-"+i, order);
+
+            // put byte[] values
+            region.put("key-bytes-"+i, ("value-"+i).getBytes());
+          }
+        }
+      });
+
+    vm2.invoke(createPool);
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-string-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof String);
+            assertEquals("value-"+i, value);
+          }
+        }
+      });
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-object-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof Order);
+            assertEquals(i, ((Order) value).getIndex());
+          }
+        }
+      });
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-bytes-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof byte[]);
+            assertEquals("value-"+i, new String((byte[]) value));
+          }
+        }
+      });
+
+  SerializableRunnable closePool =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(closePool);
+    vm2.invoke(closePool);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+    /**
+   * Tests the put operation of the {@link Pool}
+   *
+   * @since 3.5
+   */
+  public void test013PoolPutNoDeserialize() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = Host.getHost(0).getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null,null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable createPool =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+           createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(createPool);
+
+    vm1.invoke(new CacheSerializableRunnable("Put values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            // put string values
+            region.put("key-string-"+i, "value-"+i);
+
+            // put object values
+            Order order = new Order();
+            order.init(i);
+            region.put("key-object-"+i, order);
+
+            // put byte[] values
+            region.put("key-bytes-"+i, ("value-"+i).getBytes());
+          }
+        }
+      });
+
+    vm2.invoke(createPool);
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-string-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof String);
+            assertEquals("value-"+i, value);
+          }
+        }
+      });
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-object-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof Order);
+            assertEquals(i, ((Order) value).getIndex());
+          }
+        }
+      });
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-bytes-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof byte[]);
+            assertEquals("value-"+i, new String((byte[]) value));
+          }
+        }
+      });
+
+  SerializableRunnable closePool =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(closePool);
+    vm2.invoke(closePool);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+    Wait.pause(5 * 1000);
+  }
+
+  /**
+   * Tests that invalidates and destroys are propagated to {@link Pool}s.
+   *
+   * @since 3.5
+   */
+  public void test014InvalidateAndDestroyPropagation() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+          }
+        };
+
+    vm1.invoke(create);
+    vm1.invoke(new CacheSerializableRunnable("Populate region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "old" + i);
+          }
+        }
+      });
+    vm2.invoke(create);
+    Wait.pause(5 * 1000);
+    
+    vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          ctl.enableEventHistory();
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("Update region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "new" + i, "callbackArg" + i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForInvalidated(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNotNull(entry);
+            assertNull(entry.getValue());
+          }
+          {
+            List l = ctl.getEventHistory();
+            assertEquals(10, l.size());
+            for (int i = 0; i < 10; i++) {
+              Object key = new Integer(i);
+              EntryEvent ee = (EntryEvent)l.get(i);
+              assertEquals(key, ee.getKey());
+              assertEquals("old" + i, ee.getOldValue());
+              assertEquals(Operation.INVALIDATE, ee.getOperation());
+              assertEquals("callbackArg" + i, ee.getCallbackArgument());
+              assertEquals(true, ee.isOriginRemote());
+            }
+          }
+        }
+      });
+
+
+    vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("new" + i, region.getEntry(key).getValue());
+            region.destroy(key, "destroyCB"+i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForDestroyed(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNull(entry);
+          }
+          {
+            List l = ctl.getEventHistory();
+            assertEquals(10, l.size());
+            for (int i = 0; i < 10; i++) {
+              Object key = new Integer(i);
+              EntryEvent ee = (EntryEvent)l.get(i);
+              assertEquals(key, ee.getKey());
+              assertEquals(null, ee.getOldValue());
+              assertEquals(Operation.DESTROY, ee.getOperation());
+              assertEquals("destroyCB"+i, ee.getCallbackArgument());
+              assertEquals(true, ee.isOriginRemote());
+            }
+          }
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("recreate") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            region.create(key, "create" + i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+    
+    vm1.invoke(new CacheSerializableRunnable("Verify creates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          List l = ctl.getEventHistory();
+          com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("history (should be empty): " + l);
+          assertEquals(0, l.size());
+          // now see if we can get it from the server
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("create"+i, region.get(key, "loadCB"+i));
+          }
+          l = ctl.getEventHistory();
+          assertEquals(10, l.size());
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            EntryEvent ee = (EntryEvent)l.get(i);
+            com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("processing " + ee);
+            assertEquals(key, ee.getKey());
+            assertEquals(null, ee.getOldValue());
+            assertEquals("create"+i, ee.getNewValue());
+            assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
+            assertEquals("loadCB"+i, ee.getCallbackArgument());
+            assertEquals(false, ee.isOriginRemote());
+          }
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(close);
+    vm2.invoke(close);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+  /**
+   * Tests that invalidates and destroys are propagated to {@link Pool}s
+   * correctly to DataPolicy.EMPTY + InterestPolicy.ALL
+   *
+   * @since 5.0
+   */
+  public void test015InvalidateAndDestroyToEmptyAllPropagation() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+       vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable createEmpty =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            factory.setDataPolicy(DataPolicy.EMPTY);
+            factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+          }
+        };
+    SerializableRunnable createNormal =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+          }
+        };
+
+    vm1.invoke(createEmpty);
+    vm1.invoke(new CacheSerializableRunnable("Populate region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "old" + i);
+          }
+        }
+      });
+
+    vm2.invoke(createNormal);
+    vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          ctl.enableEventHistory();
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("Update region") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "new" + i, "callbackArg" + i);
+      }
+        }
+    });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForInvalidated(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNull(entry); // we are empty!
+                }
+          {
+            List l = ctl.getEventHistory();
+            assertEquals(10, l.size());
+            for (int i = 0; i < 10; i++) {
+              Object key = new Integer(i);
+              EntryEvent ee = (EntryEvent)l.get(i);
+              assertEquals(key, ee.getKey());
+              assertEquals(null, ee.getOldValue());
+              assertEquals(false, ee.isOldValueAvailable()); // failure
+              assertEquals(Operation.INVALIDATE, ee.getOperation());
+              assertEquals("callbackArg" + i, ee.getCallbackArgument());
+              assertEquals(true, ee.isOriginRemote());
+              }
+          }
+
+        }
+      });
+
+
+    vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+          public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("new" + i, region.getEntry(key).getValue());
+            region.destroy(key, "destroyCB"+i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForDestroyed(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNull(entry);
+          }
+          {
+            List l = ctl.getEventHistory();
+            assertEquals(10, l.size());
+            for (int i = 0; i < 10; i++) {
+              Object key = new Integer(i);
+              EntryEvent ee = (EntryEvent)l.get(i);
+              assertEquals(key, ee.getKey());
+              assertEquals(null, ee.getOldValue());
+              assertEquals(false, ee.isOldValueAvailable());
+              assertEquals(Operation.DESTROY, ee.getOperation());
+              assertEquals("destroyCB"+i, ee.getCallbackArgument());
+              assertEquals(true, ee.isOriginRemote());
+            }
+          }
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("recreate") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            region.create(key, "create" + i, "createCB"+i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+    
+    vm1.invoke(new CacheSerializableRunnable("Verify creates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForInvalidated(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNull(entry);
+          }
+          List l = ctl.getEventHistory();
+          assertEquals(10, l.size());
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            EntryEvent ee = (EntryEvent)l.get(i);
+            assertEquals(key, ee.getKey());
+            assertEquals(null, ee.getOldValue());
+            assertEquals(false, ee.isOldValueAvailable());
+            assertEquals(Operation.INVALIDATE, ee.getOperation());
+            assertEquals("createCB"+i, ee.getCallbackArgument());
+            assertEquals(true, ee.isOriginRemote());
+          }
+          // now see if we can get it from the server
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("create"+i, region.get(key, "loadCB"+i));
+          }
+          l = ctl.getEventHistory();
+          assertEquals(10, l.size());
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            EntryEvent ee = (EntryEvent)l.get(i);
+            assertEquals(key, ee.getKey());
+            assertEquals(null, ee.getOldValue());
+            assertEquals("create"+i, ee.getNewValue());
+            assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
+            assertEquals("loadCB"+i, ee.getCallbackArgument());
+            assertEquals(false, ee.isOriginRemote());
+          }
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(close);
+    vm2.invoke(close);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+
+  /**
+   * Tests that invalidates and destroys are propagated to {@link Pool}s
+   * correctly to DataPolicy.EMPTY + InterestPolicy.CACHE_CONTENT
+   *
+   * @since 5.0
+   */
+  public void test016InvalidateAndDestroyToEmptyCCPropagation() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable createEmpty =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            factory.setDataPolicy(DataPolicy.EMPTY);
+            factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+         }
+        };
+    SerializableRunnable createNormal =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+          }
+        };
+
+    vm1.invoke(createEmpty);
+    vm1.invoke(new CacheSerializableRunnable("Populate region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "old" + i);
+          }
+        }
+      });
+
+    vm2.invoke(createNormal);
+    vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          ctl.enableEventHistory();
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("Update region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "new" + i, "callbackArg" + i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          List l = ctl.getEventHistory();
+          assertEquals(0, l.size());
+        }
+      });
+
+
+    vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("new" + i, region.getEntry(key).getValue());
+            region.destroy(key, "destroyCB"+i);
+          }
+        }
+      });
+
+    vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          List l = ctl.getEventHistory();
+          assertEquals(0, l.size());
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("recreate") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            region.create(key, "create" + i, "createCB"+i);
+          }
+        }
+

<TRUNCATED>


[2/2] incubator-geode git commit: GEODE-1111 Connection Pooling needs more tests

Posted by bs...@apache.org.
GEODE-1111 Connection Pooling needs more tests

These tests were dependent on an Order class that is only available in
Pivotal's old test framework.  I've created a small Order class that replaces
it and allows the tests to run as part of Geode's geode-core distributedTest
task.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4ed2fd37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4ed2fd37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4ed2fd37

Branch: refs/heads/develop
Commit: 4ed2fd374cf27ff704b09600eef263b71be9eabc
Parents: ac3d3b4
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Mar 17 15:04:59 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Mar 17 15:10:10 2016 -0700

----------------------------------------------------------------------
 .../cache/ConnectionPoolAutoDUnitTest.java      |   45 +
 .../gemfire/cache/ConnectionPoolDUnitTest.java  | 5871 ++++++++++++++++++
 2 files changed, 5916 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
new file mode 100755
index 0000000..ad110d7
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
@@ -0,0 +1,45 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache;
+
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+
+import static org.junit.runners.MethodSorters.*;
+import org.junit.FixMethodOrder;
+
+@FixMethodOrder(NAME_ASCENDING)
+public class ConnectionPoolAutoDUnitTest extends ConnectionPoolDUnitTest {
+
+  public ConnectionPoolAutoDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    // TODO Auto-generated method stub
+    ClientServerTestCase.AUTO_LOAD_BALANCE = true;
+    Invoke.invokeInEveryVM(new SerializableRunnable("setupAutoMode") {
+      public void run() {
+        ClientServerTestCase.AUTO_LOAD_BALANCE = true;
+      }
+    });
+  }
+
+  @Override
+  protected final void postTearDownConnectionPoolDUnitTest() throws Exception {
+    ClientServerTestCase.AUTO_LOAD_BALANCE  = false;
+    Invoke.invokeInEveryVM(new SerializableRunnable("disableAutoMode") {
+      public void run() {
+        ClientServerTestCase.AUTO_LOAD_BALANCE = false;
+      }
+    });
+  }
+  
+}