You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/03/18 00:11:45 UTC

[1/5] incubator-geode git commit: GEODE-1111 Connection Pooling needs more tests

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1050 f45f0f549 -> 893dc86b9


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
new file mode 100755
index 0000000..41d48aa
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
@@ -0,0 +1,5871 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache;
+
+import static org.junit.runners.MethodSorters.NAME_ASCENDING;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.AssertionFailedError;
+
+import org.junit.FixMethodOrder;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.client.NoAvailableServersException;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Endpoint;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.CertifiableTestCacheListener;
+import com.gemstone.gemfire.cache30.TestCacheLoader;
+import com.gemstone.gemfire.cache30.TestCacheWriter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.EntryExpiryTask;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifierStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LocalLogWriter;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.NetworkUtils;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.ThreadUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+/**
+ * This class tests the client connection pool in GemFire.
+ * It does so by creating a cache server with a cache and a pre-defined region and
+ * a data loader. The client creates the same region with a pool
+ * (this happens in the controller VM). the client then spins up
+ * 10 different threads and issues gets on keys. The server data loader returns the
+ * data to the client.
+ * Test uses Groboutils TestRunnable objects to achieve multi threading behavior
+ * in the test.
+ *
+ */
+@FixMethodOrder(NAME_ASCENDING)
+public class ConnectionPoolDUnitTest extends CacheTestCase {
+
+  private static final long serialVersionUID = 1L;
+
+  /** The port on which the bridge server was started in this VM */
+  private static int bridgeServerPort;
+
+  protected static int port = 0;
+  protected static int port2 = 0;
+
+  protected static int numberOfAfterInvalidates;
+  protected static int numberOfAfterCreates;
+  protected static int numberOfAfterUpdates;
+
+  protected final static int TYPE_CREATE = 0;
+  protected final static int TYPE_UPDATE = 1;
+  protected final static int TYPE_INVALIDATE = 2;
+  protected final static int TYPE_DESTROY = 3;
+
+  public ConnectionPoolDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    // avoid IllegalStateException from HandShake by connecting all vms to
+    // system before creating pool
+    getSystem();
+    Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+  }
+  
+  @Override
+  protected final void postTearDownCacheTestCase() throws Exception {
+    Invoke.invokeInEveryVM(new SerializableRunnable() {
+      public void run() {
+        Map pools = PoolManager.getAll();
+        if (!pools.isEmpty()) {
+          com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().warning("found pools remaining after teardown: " + pools);
+          assertEquals(0, pools.size());
+        }
+      }
+    });
+    postTearDownConnectionPoolDUnitTest();
+  }
+  
+  protected void postTearDownConnectionPoolDUnitTest() throws Exception {
+  }
+
+  protected/*GemStoneAddition*/ static PoolImpl getPool(Region r) {
+    PoolImpl result = null;
+    String poolName = r.getAttributes().getPoolName();
+    if (poolName != null) {
+      result = (PoolImpl)PoolManager.find(poolName);
+    }
+    return result;
+  }
+  protected static TestCacheWriter getTestWriter(Region r) {
+    return (TestCacheWriter)r.getAttributes().getCacheWriter();
+  }
+  /**
+   * Create a bridge server on the given port without starting it.
+   *
+   * @since 5.0.2
+   */
+  protected void createBridgeServer(int port) throws IOException {
+    CacheServer bridge = getCache().addCacheServer();
+    bridge.setPort(port);
+    bridge.setMaxThreads(getMaxThreads());
+    bridgeServerPort = bridge.getPort();
+  }
+
+  /**
+   * Starts a bridge server on the given port, using the given
+   * deserializeValues and notifyBySubscription to serve up the
+   * given region.
+   *
+   * @since 4.0
+   */
+  protected void startBridgeServer(int port)
+    throws IOException {
+    startBridgeServer(port, -1);
+  }
+  
+  protected void startBridgeServer(int port, int socketBufferSize) throws IOException {
+    startBridgeServer(port, socketBufferSize, CacheServer.DEFAULT_LOAD_POLL_INTERVAL);
+  }
+
+  protected void startBridgeServer(int port, int socketBufferSize, long loadPollInterval)
+    throws IOException {
+
+    Cache cache = getCache();
+    CacheServer bridge = cache.addCacheServer();
+    bridge.setPort(port);
+    if (socketBufferSize != -1) {
+      bridge.setSocketBufferSize(socketBufferSize);
+    }
+    bridge.setMaxThreads(getMaxThreads());
+    bridge.setLoadPollInterval(loadPollInterval);
+    bridge.start();
+    bridgeServerPort = bridge.getPort();
+  }
+
+  /**
+   * By default return 0 which turns off selector and gives thread per cnx.
+   * Test subclasses can override to run with selector.
+   * @since 5.1
+   */
+  protected int getMaxThreads() {
+    return 0;
+  }
+
+  /**
+   * Stops the bridge server that serves up the given cache.
+   *
+   * @since 4.0
+   */
+  void stopBridgeServer(Cache cache) {
+    CacheServer bridge =
+        cache.getCacheServers().iterator().next();
+    bridge.stop();
+    assertFalse(bridge.isRunning());
+  }
+
+  void stopBridgeServers(Cache cache) {
+    CacheServer bridge = null;
+    for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) {
+      bridge = (CacheServer) bsI.next();
+      bridge.stop();
+      assertFalse(bridge.isRunning());
+    }
+  }
+
+  private void restartBridgeServers(Cache cache) throws IOException
+  {
+    CacheServer bridge = null;
+    for (Iterator bsI = cache.getCacheServers().iterator();bsI.hasNext(); ) {
+      bridge = (CacheServer) bsI.next();
+      bridge.start();
+      assertTrue(bridge.isRunning());
+    }
+  }
+
+  protected InternalDistributedSystem createLonerDS() {
+    disconnectFromDS();
+    InternalDistributedSystem ds = getLonerSystem();
+    assertEquals(0, ds.getDistributionManager().getOtherDistributionManagerIds().size());
+    return ds;
+  }
+
+  
+  
+  /**
+   * Returns region attributes for a <code>LOCAL</code> region
+   */
+  protected RegionAttributes getRegionAttributes() {
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+    factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+    return factory.create();
+  }
+
+  private static String createBridgeClientConnection(String host, int[] ports) {
+    StringBuffer sb = new StringBuffer();
+    for (int i = 0; i < ports.length; i++) {
+      if (i > 0) sb.append(",");
+      sb.append("name" + i + "=");
+      sb.append(host + ":" + ports[i]);
+    }
+    return sb.toString();
+  }
+
+  private class EventWrapper {
+    public final EntryEvent event;
+    public final Object key;
+    public final Object val;
+    public final Object arg;
+    public final  int type;
+    public EventWrapper(EntryEvent ee, int type) {
+      this.event = ee;
+      this.key = ee.getKey();
+      this.val = ee.getNewValue();
+      this.arg = ee.getCallbackArgument();
+      this.type = type;
+    }
+    public String toString() {
+      return "EventWrapper: event=" + event + ", type=" + type;
+    }
+  }
+
+  protected class ControlListener extends CacheListenerAdapter {
+    public final LinkedList events = new LinkedList();
+    public final Object CONTROL_LOCK = new Object();
+
+    public boolean waitWhileNotEnoughEvents(long sleepMs, int eventCount) {
+      long maxMillis = System.currentTimeMillis() + sleepMs;
+      synchronized (this.CONTROL_LOCK) {
+        try {
+          while (this.events.size() < eventCount) {
+            long waitMillis = maxMillis - System.currentTimeMillis();
+            if (waitMillis < 10) {
+              break;
+            }
+            this.CONTROL_LOCK.wait(waitMillis);
+          }
+        } catch (InterruptedException abort) {
+          fail("interrupted");
+        }
+        return !this.events.isEmpty();
+      }
+    }
+
+    public void afterCreate(EntryEvent e) {
+      //System.out.println("afterCreate: " + e);
+      synchronized(this.CONTROL_LOCK) {
+        this.events.add(new EventWrapper(e, TYPE_CREATE));
+        this.CONTROL_LOCK.notifyAll();
+      }
+    }
+
+    public void afterUpdate(EntryEvent e) {
+      //System.out.println("afterUpdate: " + e);
+      synchronized(this.CONTROL_LOCK) {
+        this.events.add(new EventWrapper(e, TYPE_UPDATE));
+        this.CONTROL_LOCK.notifyAll();
+      }
+    }
+
+    public void afterInvalidate(EntryEvent e) {
+      //System.out.println("afterInvalidate: " + e);
+      synchronized(this.CONTROL_LOCK) {
+        this.events.add(new EventWrapper(e, TYPE_INVALIDATE));
+        this.CONTROL_LOCK.notifyAll();
+      }
+    }
+
+    public void afterDestroy(EntryEvent e) {
+      //System.out.println("afterDestroy: " + e);
+      synchronized(this.CONTROL_LOCK) {
+        this.events.add(new EventWrapper(e, TYPE_DESTROY));
+        this.CONTROL_LOCK.notifyAll();
+      }
+    }
+  }
+
+  
+
+
+  /**
+   * Create a fake EntryEvent that returns the provided region for {@link CacheEvent#getRegion()}
+   * and returns {@link com.gemstone.gemfire.cache.Operation#LOCAL_LOAD_CREATE} for {@link CacheEvent#getOperation()}
+   * @param r
+   * @return fake entry event
+   */
+  protected static EntryEvent createFakeyEntryEvent(final Region r) {
+    return new EntryEvent() {
+      public Operation getOperation()
+      {
+        return Operation.LOCAL_LOAD_CREATE; // fake out pool to exit early
+      }
+      public Region getRegion()
+      {
+        return r;
+      }
+      public Object getKey() { return null; }
+      public Object getOldValue() { return null;}
+      public boolean isOldValueAvailable() {return true;}
+      public Object getNewValue() { return null;}
+      public boolean isLocalLoad() { return false;}
+      public boolean isNetLoad() {return false;}
+      public boolean isLoad() {return true; }
+      public boolean isNetSearch() {return false;}
+      public TransactionId getTransactionId() {return null;}
+      public Object getCallbackArgument() {return null;}
+      public boolean isCallbackArgumentAvailable() {return true;}
+      public boolean isOriginRemote() {return false;}
+      public DistributedMember getDistributedMember() {return null;}
+      public boolean isExpiration() { return false;}
+      public boolean isDistributed() { return false;}
+      public boolean isBridgeEvent() {
+        return hasClientOrigin();
+      }
+      public boolean hasClientOrigin() {
+        return false;
+      }
+      public ClientProxyMembershipID getContext() {
+        return null;
+      }
+      public SerializedCacheValue getSerializedOldValue() {return null;}
+      public SerializedCacheValue getSerializedNewValue() {return null;}
+    };
+  }
+
+  public void verifyBalanced(final PoolImpl pool, int expectedServer, 
+      final int expectedConsPerServer) {
+    verifyServerCount(pool, expectedServer);
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return balanced(pool, expectedConsPerServer);
+      }
+      public String description() {
+        return "expected " + expectedConsPerServer
+            + " but endpoints=" + outOfBalanceReport(pool);
+      }
+    };
+    Wait.waitForCriterion(ev, 2 * 60 * 1000, 200, true);
+    assertEquals("expected " + expectedConsPerServer
+                 + " but endpoints=" + outOfBalanceReport(pool),
+                 true, balanced(pool, expectedConsPerServer));
+  }
+  protected boolean balanced(PoolImpl pool, int expectedConsPerServer) {
+    Iterator it = pool.getEndpointMap().values().iterator();
+    while (it.hasNext()) {
+      Endpoint ep = (Endpoint)it.next();
+      if (ep.getStats().getConnections() != expectedConsPerServer) {
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  protected String outOfBalanceReport(PoolImpl pool) {
+    StringBuffer result = new StringBuffer();
+    Iterator it = pool.getEndpointMap().values().iterator();
+    result.append("<");
+    while (it.hasNext()) {
+      Endpoint ep = (Endpoint)it.next();
+      result.append("ep=" + ep);
+      result.append(" conCount=" + ep.getStats().getConnections());
+      if (it.hasNext()) {
+        result.append(", ");
+      }
+    }
+    result.append(">");
+    return result.toString();
+  }
+  
+  public void waitForBlacklistToClear(final PoolImpl pool) {
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return pool.getBlacklistedServers().size() == 0;
+      }
+      public String description() {
+        return null;
+      }
+    };
+    Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+    assertEquals("unexpected blacklistedServers=" + pool.getBlacklistedServers(),
+                 0, pool.getBlacklistedServers().size());
+  }
+  
+  public void verifyServerCount(final PoolImpl pool, final int expectedCount) {
+    getCache().getLogger().info("verifyServerCount expects=" + expectedCount);
+    WaitCriterion ev = new WaitCriterion() {
+      String excuse;
+      public boolean done() {
+        int actual = pool.getConnectedServerCount();
+        if (actual == expectedCount) {
+          return true;
+        }
+        excuse = "Found only " + actual + " servers, expected " + expectedCount;
+        return false;
+      }
+      public String description() {
+        return excuse;
+      }
+    };
+    Wait.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
+  }
+
+  /**
+   * Tests that the callback argument is sent to the server
+   */
+  public void test001CallbackArg() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    final Object createCallbackArg = "CREATE CALLBACK ARG";
+    final Object updateCallbackArg = "PUT CALLBACK ARG";
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+
+          CacheWriter cw = new TestCacheWriter() {
+              public final void beforeUpdate2(EntryEvent event)
+                throws CacheWriterException {
+                Object beca = event.getCallbackArgument();
+                assertEquals(updateCallbackArg, beca);
+              }
+
+              public void beforeCreate2(EntryEvent event)
+                throws CacheWriterException {
+                Object beca =  event.getCallbackArgument();
+                assertEquals(createCallbackArg, beca);
+              }
+            };
+            AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+
+            ClientServerTestCase.configureConnectionPool(factory,NetworkUtils.getServerHostName(host),port,-1,true,-1,-1, null);
+            createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(create);
+    vm1.invoke(new CacheSerializableRunnable("Add entries") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.create(new Integer(i), "old" + i, createCallbackArg);
+          }
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "new" + i, updateCallbackArg);
+        }
+        }
+      });
+
+    vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          TestCacheWriter writer = getTestWriter(region);
+          assertTrue(writer.wasInvoked());
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          region.localDestroyRegion();
+        }
+    };
+
+    vm1.invoke(close);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+
+  }
+
+  /**
+   * Tests that consecutive puts have the callback assigned
+   * appropriately.
+   */
+  public void test002CallbackArg2() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+
+    final Object createCallbackArg = "CREATE CALLBACK ARG";
+//    final Object updateCallbackArg = "PUT CALLBACK ARG";
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          CacheWriter cw = new TestCacheWriter() {
+              public void beforeCreate2(EntryEvent event)
+                throws CacheWriterException {
+                Integer key = (Integer) event.getKey();
+                if (key.intValue() % 2 == 0) {
+                  Object beca =  event.getCallbackArgument();
+                  assertEquals(createCallbackArg, beca);
+                } else {
+                  Object beca =  event.getCallbackArgument();
+                  assertNull(beca);
+                }
+              }
+            };
+            AttributesFactory factory = getBridgeServerRegionAttributes(null, cw);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(create);
+    vm1.invoke(new CacheSerializableRunnable("Add entries") {
+        public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            if (i % 2 == 0) {
+              region.create(new Integer(i), "old" + i, createCallbackArg);
+
+            } else {
+              region.create(new Integer(i), "old" + i);
+            }
+          }
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(close);
+
+    vm0.invoke(new CacheSerializableRunnable("Check cache writer") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          TestCacheWriter writer = getTestWriter(region);
+          assertTrue(writer.wasInvoked());
+        }
+      });
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+
+  
+  /**
+   * Tests for bug 36684 by having two bridge servers with cacheloaders that should always return
+   * a value and one client connected to each server reading values. If the bug exists, the
+   * clients will get null sometimes. 
+   * @throws InterruptedException 
+   */
+  public void test003Bug36684() throws CacheException, InterruptedException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    // Create the cache servers with distributed, mirrored region
+    SerializableRunnable createServer =
+      new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          CacheLoader cl = new CacheLoader() {
+            public Object load(LoaderHelper helper) {
+              return helper.getKey();
+            }
+            public void close() {
+
+            }
+          };
+          AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      };
+    getSystem().getLogWriter().info("before create server");
+    vm0.invoke(createServer);
+    vm1.invoke(createServer);
+
+    // Create cache server clients
+    final int numberOfKeys = 1000;
+    final String host0 = NetworkUtils.getServerHostName(host);
+    final int vm0Port = vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final int vm1Port = vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    SerializableRunnable createClient =
+      new CacheSerializableRunnable("Create Cache Server Client") {
+        public void run2() throws CacheException {
+          // reset all static listener variables in case this is being rerun in a subclass
+          numberOfAfterInvalidates = 0;
+          numberOfAfterCreates  = 0;
+          numberOfAfterUpdates = 0;
+          // create the region
+          getLonerSystem();
+          AttributesFactory factory = new AttributesFactory();
+          factory.setScope(Scope.LOCAL);
+          factory.setConcurrencyChecksEnabled(false); // test validation expects this behavior
+          // create bridge writer
+          ClientServerTestCase.configureConnectionPool(factory,host0,vm0Port,vm1Port,true,-1,-1, null);
+         createRegion(name, factory.create());
+        }
+      };
+    getSystem().getLogWriter().info("before create client");
+    vm2.invoke(createClient);
+    vm3.invoke(createClient);
+
+    // Initialize each client with entries (so that afterInvalidate is called)
+    SerializableRunnable initializeClient =
+      new CacheSerializableRunnable("Initialize Client") {
+      public void run2() throws CacheException {
+//        StringBuffer errors = new StringBuffer();
+          numberOfAfterInvalidates = 0;
+          numberOfAfterCreates = 0;
+          numberOfAfterUpdates = 0;
+          LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
+          for (int i=0; i<numberOfKeys; i++) {
+            String expected = "key-"+i;
+            String actual = (String) region.get("key-"+i);
+            assertEquals(expected, actual);
+          }
+      }
+    };
+    
+    getSystem().getLogWriter().info("before initialize client");
+    AsyncInvocation inv2 = vm2.invokeAsync(initializeClient);
+    AsyncInvocation inv3 = vm3.invokeAsync(initializeClient);
+    
+    ThreadUtils.join(inv2, 30 * 1000);
+    ThreadUtils.join(inv3, 30 * 1000);
+    
+    if (inv2.exceptionOccurred()) { 
+      com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm2", inv2.getException());
+    }
+    if(inv3.exceptionOccurred()) {
+      com.gemstone.gemfire.test.dunit.Assert.fail("Error occured in vm3", inv3.getException());
+    }
+  }
+  
+
+  /**
+   * Test for client connection loss with CacheLoader Exception on the server.
+   */
+  public void test004ForCacheLoaderException() throws CacheException, InterruptedException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+
+    // Create the cache servers with distributed, mirrored region
+    SerializableRunnable createServer =
+      new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          CacheLoader cl = new CacheLoader() {
+            public Object load(LoaderHelper helper) {
+              System.out.println("### CALLING CACHE LOADER....");
+              throw new CacheLoaderException("Test for CahceLoaderException causing Client connection to disconnect.");
+            }
+            public void close() {
+            }
+          };
+          AttributesFactory factory = getBridgeServerMirroredAckRegionAttributes(cl, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      };
+    getSystem().getLogWriter().info("before create server");
+
+    server.invoke(createServer);
+
+    // Create cache server clients
+    final int numberOfKeys = 10;
+    final String host0 = NetworkUtils.getServerHostName(host);
+    final int[] port = new int[] {server.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort())};
+    final String poolName = "myPool";
+
+    SerializableRunnable createClient =
+      new CacheSerializableRunnable("Create Cache Server Client") {
+        public void run2() throws CacheException {
+          getLonerSystem();
+          AttributesFactory factory = new AttributesFactory();
+          factory.setScope(Scope.LOCAL);
+          factory.setConcurrencyChecksEnabled(false);
+          // create bridge writer
+          ClientServerTestCase.configureConnectionPoolWithName(factory,host0,port,true,-1, -1, null, poolName);
+         createRegion(name, factory.create());
+        }
+      };
+    getSystem().getLogWriter().info("before create client");
+    client.invoke(createClient);
+
+    // Initialize each client with entries (so that afterInvalidate is called)
+    SerializableRunnable invokeServerCacheLaoder =
+      new CacheSerializableRunnable("Initialize Client") {
+      public void run2() throws CacheException {
+          LocalRegion region = (LocalRegion) getRootRegion().getSubregion(name);
+          PoolStats stats = ((PoolImpl)PoolManager.find(poolName)).getStats();
+          int oldConnects = stats.getConnects();
+          int oldDisConnects = stats.getDisConnects();
+          try {
+          for (int i=0; i<numberOfKeys; i++) {
+            String actual = (String) region.get("key-"+i);
+          }
+          } catch (Exception ex){
+            if (!(ex.getCause() instanceof CacheLoaderException)) {
+              fail ("UnExpected Exception, expected to receive CacheLoaderException from server, instead found: " + ex.getCause().getClass());
+            }
+          }
+          int newConnects = stats.getConnects();
+          int newDisConnects = stats.getDisConnects();
+          //System.out.println("#### new connects/disconnects :" + newConnects + ":" + newDisConnects);
+          if (newConnects != oldConnects && newDisConnects != oldDisConnects) {
+            fail ("New connection has created for Server side CacheLoaderException.");
+          }
+      }
+    };
+
+    getSystem().getLogWriter().info("before initialize client");
+    AsyncInvocation inv2 = client.invokeAsync(invokeServerCacheLaoder);
+    
+    ThreadUtils.join(inv2, 30 * 1000);
+    SerializableRunnable stopServer = new SerializableRunnable("stop CacheServer") {
+      public void run() {
+        stopBridgeServer(getCache());
+      }
+    };
+    server.invoke(stopServer);
+    
+  }
+
+
+  protected void validateDS() {
+    List l = InternalDistributedSystem.getExistingSystems();
+    if (l.size() > 1) {
+      getSystem().getLogWriter().info("validateDS: size="
+                                      + l.size()
+                                      + " isDedicatedAdminVM="
+                                      + DistributionManager.isDedicatedAdminVM
+                                      + " l=" + l);
+    }
+    assertFalse(DistributionManager.isDedicatedAdminVM);
+    assertEquals(1, l.size());
+  }
+  
+
+  /**
+   * Tests the basic operations of the {@link Pool}
+   *
+   * @since 3.5
+   */
+  public void test006Pool() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = Host.getHost(0).getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = new AttributesFactory();
+          factory.setScope(Scope.DISTRIBUTED_ACK);
+          factory.setConcurrencyChecksEnabled(false);
+          factory.setCacheLoader(new CacheLoader() {
+              public Object load(LoaderHelper helper) {
+                //System.err.println("CacheServer data loader called");
+                return helper.getKey().toString();
+              }
+              public void close() {
+
+              }
+            });
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            validateDS();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            createRegion(name, factory.create());
+          }
+        };
+    vm1.invoke(create);
+
+    vm1.invoke(new CacheSerializableRunnable("Get values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get(new Integer(i));
+            assertEquals(String.valueOf(i), value);
+          }
+        }
+      });
+
+    vm1.invoke(new CacheSerializableRunnable("Update values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), new Integer(i));
+          }
+        }
+      });
+
+    vm2.invoke(create);
+    vm2.invoke(new CacheSerializableRunnable("Validate values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get(new Integer(i));
+            assertNotNull(value);
+            assertTrue(value instanceof Integer);
+            assertEquals(i, ((Integer) value).intValue());
+          }
+        }
+      });
+
+    vm1.invoke(new CacheSerializableRunnable("Close Pool") {
+        // do some special close validation here
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          String pName = region.getAttributes().getPoolName();
+          PoolImpl p = (PoolImpl)PoolManager.find(pName);
+          assertEquals(false, p.isDestroyed());
+          assertEquals(1, p.getAttachCount());
+          try {
+            p.destroy();
+            fail("expected IllegalStateException");
+          } catch (IllegalStateException expected) {
+          }
+          region.localDestroyRegion();
+          assertEquals(false, p.isDestroyed());
+          assertEquals(0, p.getAttachCount());
+        }
+      });
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+  
+  
+    
+  
+  /**
+   * Tests the BridgeServer failover (bug 31832).
+   */
+  public void test007BridgeServerFailoverCnx1() throws CacheException {
+    disconnectAllFromDS();
+    basicTestBridgeServerFailover(1);
+  }
+  /**
+   * Test BridgeServer failover with connectionsPerServer set to 0
+   */
+  public void test008BridgeServerFailoverCnx0() throws CacheException {
+    basicTestBridgeServerFailover(0);
+  }
+  private void basicTestBridgeServerFailover(final int cnxCount) throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    // Create two bridge servers
+    SerializableRunnable createCacheServer =
+      new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+        }
+
+        }
+      };
+
+    vm0.invoke(createCacheServer);
+    vm1.invoke(createCacheServer);
+
+    final int port0 =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    final int port1 =
+      vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+//    final String host1 = getServerHostName(vm1.getHost());
+
+    // Create one bridge client in this VM
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+        public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,true,-1,cnxCount, null, 100);
+
+            Region region = createRegion(name, factory.create());
+
+            // force connections to form
+            region.put("keyInit", new Integer(0));
+            region.put("keyInit2", new Integer(0));
+        }
+        };
+
+    vm2.invoke(create);
+
+    // Launch async thread that puts objects into cache. This thread will execute until
+    // the test has ended (which is why the RegionDestroyedException and CacheClosedException
+    // are caught and ignored. If any other exception occurs, the test will fail. See
+    // the putAI.exceptionOccurred() assertion below.
+    AsyncInvocation putAI = vm2.invokeAsync(new CacheSerializableRunnable("Put objects") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          try {
+            for (int i=0; i<100000; i++) {
+              region.put("keyAI", new Integer(i));
+              try {Thread.sleep(100);} catch (InterruptedException ie) {
+                fail("interrupted");
+              }
+            }
+          } catch (NoAvailableServersException ignore) {
+            /*ignore*/
+          } catch (RegionDestroyedException e) { //will be thrown when the test ends
+            /*ignore*/
+          } 
+          catch (CancelException e) { //will be thrown when the test ends
+            /*ignore*/
+          }
+        }
+      });
+
+
+    SerializableRunnable verify1Server =
+      new CacheSerializableRunnable("verify1Server") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          PoolImpl pool = getPool(region);
+          verifyServerCount(pool, 1);
+        }
+        };
+    SerializableRunnable verify2Servers =
+      new CacheSerializableRunnable("verify2Servers") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          PoolImpl pool = getPool(region);
+          verifyServerCount(pool, 2);
+        }
+      };
+
+    vm2.invoke(verify2Servers);
+
+    SerializableRunnable stopCacheServer =
+      new SerializableRunnable("Stop CacheServer") {
+          public void run() {
+            stopBridgeServer(getCache());
+          }
+      };
+
+    final String expected = "java.io.IOException";
+    final String addExpected =
+      "<ExpectedException action=add>" + expected + "</ExpectedException>";
+    final String removeExpected =
+      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
+    vm2.invoke(new SerializableRunnable() {
+      public void run() {
+        LogWriter bgexecLogger =
+              new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+        bgexecLogger.info(addExpected);
+      }
+    });
+    try { // make sure we removeExpected
+
+    // Bounce the non-current server (I know that VM1 contains the non-current server
+    // because ...
+    vm1.invoke(stopCacheServer);
+
+    vm2.invoke(verify1Server);
+
+    final int restartPort = port1;
+    vm1.invoke(
+      new SerializableRunnable("Restart CacheServer") {
+      public void run() {
+        try {
+          Region region = getRootRegion().getSubregion(name);
+          assertNotNull(region);
+          startBridgeServer(restartPort);
+        }
+        catch(Exception e) {
+          getSystem().getLogWriter().fine(new Exception(e));
+          com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e);
+        }
+      }
+    });
+
+    // Pause long enough for the monitor to realize the server has been bounced
+    // and reconnect to it.
+    vm2.invoke(verify2Servers);
+
+    } finally {
+      vm2.invoke(new SerializableRunnable() {
+        public void run() {
+          LogWriter bgexecLogger =
+                new LocalLogWriter(InternalLogWriter.ALL_LEVEL, System.out);
+          bgexecLogger.info(removeExpected);
+        }
+      });
+    }
+
+    // Stop the other cache server
+    vm0.invoke(stopCacheServer);
+
+    // Run awhile
+    vm2.invoke(verify1Server);
+
+    com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("FIXME: this thread does not terminate"); // FIXME
+//    // Verify that no exception has occurred in the putter thread
+//    DistributedTestCase.join(putAI, 5 * 60 * 1000, getLogWriter());
+//    //assertTrue("Exception occurred while invoking " + putAI, !putAI.exceptionOccurred());
+//    if (putAI.exceptionOccurred()) {
+//      fail("While putting entries: ", putAI.getException());
+//    }
+
+    // Close Pool
+    vm2.invoke(new CacheSerializableRunnable("Close Pool") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          region.localDestroyRegion();
+        }
+      });
+
+    // Stop the last cache server
+    vm1.invoke(stopCacheServer);
+  }
+  
+
+  /**
+   * Make sure cnx lifetime expiration working on thread local cnxs.
+   * @author darrel
+   */
+  public void test009LifetimeExpireOnTL() throws CacheException {
+    basicTestLifetimeExpire(true);
+  }
+
+  /**
+   * Make sure cnx lifetime expiration working on thread local cnxs.
+   * @author darrel
+   */
+  public void test010LifetimeExpireOnPoolCnx() throws CacheException {
+    basicTestLifetimeExpire(false);
+  }
+
+  protected static volatile boolean stopTestLifetimeExpire = false;
+
+  protected static volatile int baselineLifetimeCheck;
+  protected static volatile int baselineLifetimeExtensions;
+  protected static volatile int baselineLifetimeConnect;
+  protected static volatile int baselineLifetimeDisconnect;
+
+  private void basicTestLifetimeExpire(final boolean threadLocal) throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    AsyncInvocation putAI = null;
+    AsyncInvocation putAI2 = null;
+
+    try {
+
+      // Create two bridge servers
+      SerializableRunnable createCacheServer =
+        new CacheSerializableRunnable("Create Cache Server") {
+          public void run2() throws CacheException {
+            AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+            factory.setCacheListener(new DelayListener(25));
+            createRegion(name, factory.create());
+            try {
+              startBridgeServer(0);
+
+            } catch (Exception ex) {
+              com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+            }
+
+          }
+        };
+
+      vm0.invoke(createCacheServer);
+
+      final int port0 =
+        vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+      final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+      vm1.invoke(createCacheServer);
+      final int port1 =
+        vm1.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+      SerializableRunnable stopCacheServer =
+        new SerializableRunnable("Stop CacheServer") {
+          public void run() {
+            stopBridgeServer(getCache());
+          }
+        };
+      // we only had to stop it to reserve a port
+      vm1.invoke(stopCacheServer);
+
+
+      // Create one bridge client in this VM
+      SerializableRunnable create =
+        new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port0,port1,false/*queue*/,-1,0, null, 100, 500, threadLocal, 500);
+
+            Region region = createRegion(name, factory.create());
+
+            // force connections to form
+            region.put("keyInit", new Integer(0));
+            region.put("keyInit2", new Integer(0));
+          }
+        };
+
+      vm2.invoke(create);
+
+      // Launch async thread that puts objects into cache. This thread will execute until
+      // the test has ended.
+      SerializableRunnable putter1 = 
+        new CacheSerializableRunnable("Put objects") {
+          public void run2() throws CacheException {
+            Region region = getRootRegion().getSubregion(name);
+            PoolImpl pool = getPool(region);
+            PoolStats stats = pool.getStats();
+            baselineLifetimeCheck = stats.getLoadConditioningCheck();
+            baselineLifetimeExtensions = stats.getLoadConditioningExtensions();
+            baselineLifetimeConnect = stats.getLoadConditioningConnect();
+            baselineLifetimeDisconnect = stats.getLoadConditioningDisconnect();
+            try {
+              int count = 0;
+              while (!stopTestLifetimeExpire) {
+                count++;
+                region.put("keyAI1", new Integer(count));
+              }
+            } catch (NoAvailableServersException ex) {
+              if (stopTestLifetimeExpire) {
+                return;
+              } else {
+                throw ex;
+              }
+              //           } catch (RegionDestroyedException e) { //will be thrown when the test ends
+              //             /*ignore*/
+              //           } catch (CancelException e) { //will be thrown when the test ends
+              //             /*ignore*/
+            }
+          }
+        };
+      SerializableRunnable putter2 = 
+        new CacheSerializableRunnable("Put objects") {
+          public void run2() throws CacheException {
+            Region region = getRootRegion().getSubregion(name);
+            try {
+              int count = 0;
+              while (!stopTestLifetimeExpire) {
+                count++;
+                region.put("keyAI2", new Integer(count));
+              }
+            } catch (NoAvailableServersException ex) {
+              if (stopTestLifetimeExpire) {
+                return;
+              } else {
+                throw ex;
+              }
+              //           } catch (RegionDestroyedException e) { //will be thrown when the test ends
+              //             /*ignore*/
+              //           } catch (CancelException e) { //will be thrown when the test ends
+              //             /*ignore*/
+            }
+          }
+        };
+      putAI = vm2.invokeAsync(putter1);
+      putAI2 = vm2.invokeAsync(putter2);
+
+      SerializableRunnable verify1Server =
+        new CacheSerializableRunnable("verify1Server") {
+          public void run2() throws CacheException {
+            Region region = getRootRegion().getSubregion(name);
+            PoolImpl pool = getPool(region);
+            final PoolStats stats = pool.getStats();
+            verifyServerCount(pool, 1);
+            WaitCriterion ev = new WaitCriterion() {
+              public boolean done() {
+                return stats.getLoadConditioningCheck() >= (10 + baselineLifetimeCheck);
+              }
+              public String description() {
+                return null;
+              }
+            };
+            Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+            
+            // make sure no replacements are happening.
+            // since we have 2 threads and 2 cnxs and 1 server
+            // when lifetimes are up we should only want to connect back to the
+            // server we are already connected to and thus just extend our lifetime
+            assertTrue("baselineLifetimeCheck=" + baselineLifetimeCheck
+                       + " but stats.getLoadConditioningCheck()=" + stats.getLoadConditioningCheck(),
+                       stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
+            baselineLifetimeCheck = stats.getLoadConditioningCheck();
+            assertTrue(stats.getLoadConditioningExtensions() > baselineLifetimeExtensions);
+            assertTrue(stats.getLoadConditioningConnect() == baselineLifetimeConnect);
+            assertTrue(stats.getLoadConditioningDisconnect() == baselineLifetimeDisconnect);
+          }
+        };
+      SerializableRunnable verify2Servers =
+        new CacheSerializableRunnable("verify2Servers") {
+          public void run2() throws CacheException {
+            Region region = getRootRegion().getSubregion(name);
+            PoolImpl pool = getPool(region);
+            final PoolStats stats = pool.getStats();
+            verifyServerCount(pool, 2);
+            // make sure some replacements are happening.
+            // since we have 2 threads and 2 cnxs and 2 servers
+            // when lifetimes are up we should connect to the other server sometimes.
+//            int retry = 300;
+//            while ((retry-- > 0)
+//                   && (stats.getLoadConditioningCheck() < (10+baselineLifetimeCheck))) {
+//              pause(100);
+//            }
+//            assertTrue("Bug 39209 expected "
+//                       + stats.getLoadConditioningCheck()
+//                       + " to be >= "
+//                       + (10+baselineLifetimeCheck),
+//                       stats.getLoadConditioningCheck() >= (10+baselineLifetimeCheck));
+            
+            // TODO: does this WaitCriterion actually help?
+            WaitCriterion wc = new WaitCriterion() {
+              String excuse;
+              public boolean done() {
+                int actual = stats.getLoadConditioningCheck();
+                int expected = 10 + baselineLifetimeCheck;
+                if (actual >= expected) {
+                  return true;
+                }
+                excuse = "Bug 39209 expected " + actual + " to be >= " + expected;
+                return false;
+              }
+              public String description() {
+                return excuse;
+              }
+            };
+            try {
+              Wait.waitForCriterion(wc, 60 * 1000, 1000, true);
+            } catch (AssertionFailedError e) {
+//              dumpStack();
+              throw e;
+            }
+            
+            assertTrue(stats.getLoadConditioningConnect() > baselineLifetimeConnect);
+            assertTrue(stats.getLoadConditioningDisconnect() > baselineLifetimeDisconnect);
+          }
+        };
+
+      vm2.invoke(verify1Server);
+      assertEquals(true, putAI.isAlive());
+      assertEquals(true, putAI2.isAlive());
+
+      {
+        final int restartPort = port1;
+        vm1.invoke(new SerializableRunnable("Restart CacheServer") {
+            public void run() {
+              try {
+                Region region = getRootRegion().getSubregion(name);
+                assertNotNull(region);
+                startBridgeServer(restartPort);
+              }
+              catch(Exception e) {
+                getSystem().getLogWriter().fine(new Exception(e));
+                com.gemstone.gemfire.test.dunit.Assert.fail("Failed to start CacheServer", e);
+              }
+            }
+          });
+      }
+
+      vm2.invoke(verify2Servers);
+      assertEquals(true, putAI.isAlive());
+      assertEquals(true, putAI2.isAlive());
+    } finally {
+      vm2.invoke(new SerializableRunnable("Stop Putters") {
+          public void run() {
+            stopTestLifetimeExpire = true;
+          }
+        });
+
+      try {
+        if (putAI != null) {
+          // Verify that no exception has occurred in the putter thread
+          ThreadUtils.join(putAI, 30 * 1000);
+          if (putAI.exceptionOccurred()) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While putting entries: ", putAI.getException());
+          }
+        }
+        
+        if (putAI2 != null) {
+          // Verify that no exception has occurred in the putter thread
+          ThreadUtils.join(putAI, 30 * 1000);
+          // FIXME this thread does not terminate
+//          if (putAI2.exceptionOccurred()) {
+//            fail("While putting entries: ", putAI.getException());
+//          }
+        }
+
+      } finally {
+        vm2.invoke(new SerializableRunnable("Stop Putters") {
+            public void run() {
+              stopTestLifetimeExpire = false;
+            }
+          });
+        // Close Pool
+        vm2.invoke(new CacheSerializableRunnable("Close Pool") {
+            public void run2() throws CacheException {
+              Region region = getRootRegion().getSubregion(name);
+              String poolName = region.getAttributes().getPoolName();
+              region.localDestroyRegion();
+              PoolManager.find(poolName).destroy();
+            }
+          });
+
+        SerializableRunnable stopCacheServer =
+          new SerializableRunnable("Stop CacheServer") {
+            public void run() {
+              stopBridgeServer(getCache());
+            }
+          };
+        vm1.invoke(stopCacheServer);
+        vm0.invoke(stopCacheServer);
+      }
+    }
+  }
+
+  /**
+   * Tests the create operation of the {@link Pool}
+   *
+   * @since 3.5
+   */
+  public void test011PoolCreate() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = Host.getHost(0).getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+           createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(create);
+    vm1.invoke(new CacheSerializableRunnable("Create values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.create(new Integer(i), new Integer(i));
+          }
+        }
+      });
+
+    vm2.invoke(create);
+    vm2.invoke(new CacheSerializableRunnable("Validate values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get(new Integer(i));
+            assertNotNull(value);
+            assertTrue(value instanceof Integer);
+            assertEquals(i, ((Integer) value).intValue());
+          }
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(close);
+    vm2.invoke(close);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+
+  /**
+   * Tests the put operation of the {@link Pool}
+   *
+   * @since 3.5
+   */
+  public void test012PoolPut() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = Host.getHost(0).getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+    SerializableRunnable createPool =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            // create bridge writer
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+           createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(createPool);
+
+    vm1.invoke(new CacheSerializableRunnable("Put values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            // put string values
+            region.put("key-string-"+i, "value-"+i);
+
+            // put object values
+            Order order = new Order();
+            order.init(i);
+            region.put("key-object-"+i, order);
+
+            // put byte[] values
+            region.put("key-bytes-"+i, ("value-"+i).getBytes());
+          }
+        }
+      });
+
+    vm2.invoke(createPool);
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-string-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof String);
+            assertEquals("value-"+i, value);
+          }
+        }
+      });
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-object-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof Order);
+            assertEquals(i, ((Order) value).getIndex());
+          }
+        }
+      });
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-bytes-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof byte[]);
+            assertEquals("value-"+i, new String((byte[]) value));
+          }
+        }
+      });
+
+  SerializableRunnable closePool =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(closePool);
+    vm2.invoke(closePool);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+    /**
+   * Tests the put operation of the {@link Pool}
+   *
+   * @since 3.5
+   */
+  public void test013PoolPutNoDeserialize() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = Host.getHost(0).getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null,null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable createPool =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,false,-1,-1, null);
+           createRegion(name, factory.create());
+          }
+        };
+
+    vm1.invoke(createPool);
+
+    vm1.invoke(new CacheSerializableRunnable("Put values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            // put string values
+            region.put("key-string-"+i, "value-"+i);
+
+            // put object values
+            Order order = new Order();
+            order.init(i);
+            region.put("key-object-"+i, order);
+
+            // put byte[] values
+            region.put("key-bytes-"+i, ("value-"+i).getBytes());
+          }
+        }
+      });
+
+    vm2.invoke(createPool);
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate string values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-string-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof String);
+            assertEquals("value-"+i, value);
+          }
+        }
+      });
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate object values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-object-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof Order);
+            assertEquals(i, ((Order) value).getIndex());
+          }
+        }
+      });
+
+    vm2.invoke(new CacheSerializableRunnable("Get / validate byte[] values") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object value = region.get("key-bytes-"+i);
+            assertNotNull(value);
+            assertTrue(value instanceof byte[]);
+            assertEquals("value-"+i, new String((byte[]) value));
+          }
+        }
+      });
+
+  SerializableRunnable closePool =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(closePool);
+    vm2.invoke(closePool);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+    Wait.pause(5 * 1000);
+  }
+
+  /**
+   * Tests that invalidates and destroys are propagated to {@link Pool}s.
+   *
+   * @since 3.5
+   */
+  public void test014InvalidateAndDestroyPropagation() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable create =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+          }
+        };
+
+    vm1.invoke(create);
+    vm1.invoke(new CacheSerializableRunnable("Populate region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "old" + i);
+          }
+        }
+      });
+    vm2.invoke(create);
+    Wait.pause(5 * 1000);
+    
+    vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          ctl.enableEventHistory();
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("Update region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "new" + i, "callbackArg" + i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForInvalidated(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNotNull(entry);
+            assertNull(entry.getValue());
+          }
+          {
+            List l = ctl.getEventHistory();
+            assertEquals(10, l.size());
+            for (int i = 0; i < 10; i++) {
+              Object key = new Integer(i);
+              EntryEvent ee = (EntryEvent)l.get(i);
+              assertEquals(key, ee.getKey());
+              assertEquals("old" + i, ee.getOldValue());
+              assertEquals(Operation.INVALIDATE, ee.getOperation());
+              assertEquals("callbackArg" + i, ee.getCallbackArgument());
+              assertEquals(true, ee.isOriginRemote());
+            }
+          }
+        }
+      });
+
+
+    vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("new" + i, region.getEntry(key).getValue());
+            region.destroy(key, "destroyCB"+i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForDestroyed(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNull(entry);
+          }
+          {
+            List l = ctl.getEventHistory();
+            assertEquals(10, l.size());
+            for (int i = 0; i < 10; i++) {
+              Object key = new Integer(i);
+              EntryEvent ee = (EntryEvent)l.get(i);
+              assertEquals(key, ee.getKey());
+              assertEquals(null, ee.getOldValue());
+              assertEquals(Operation.DESTROY, ee.getOperation());
+              assertEquals("destroyCB"+i, ee.getCallbackArgument());
+              assertEquals(true, ee.isOriginRemote());
+            }
+          }
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("recreate") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            region.create(key, "create" + i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+    
+    vm1.invoke(new CacheSerializableRunnable("Verify creates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          List l = ctl.getEventHistory();
+          com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("history (should be empty): " + l);
+          assertEquals(0, l.size());
+          // now see if we can get it from the server
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("create"+i, region.get(key, "loadCB"+i));
+          }
+          l = ctl.getEventHistory();
+          assertEquals(10, l.size());
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            EntryEvent ee = (EntryEvent)l.get(i);
+            com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("processing " + ee);
+            assertEquals(key, ee.getKey());
+            assertEquals(null, ee.getOldValue());
+            assertEquals("create"+i, ee.getNewValue());
+            assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
+            assertEquals("loadCB"+i, ee.getCallbackArgument());
+            assertEquals(false, ee.isOriginRemote());
+          }
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(close);
+    vm2.invoke(close);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+  /**
+   * Tests that invalidates and destroys are propagated to {@link Pool}s
+   * correctly to DataPolicy.EMPTY + InterestPolicy.ALL
+   *
+   * @since 5.0
+   */
+  public void test015InvalidateAndDestroyToEmptyAllPropagation() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          //pause(1000);
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+       vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable createEmpty =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            factory.setDataPolicy(DataPolicy.EMPTY);
+            factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+          }
+        };
+    SerializableRunnable createNormal =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+          }
+        };
+
+    vm1.invoke(createEmpty);
+    vm1.invoke(new CacheSerializableRunnable("Populate region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "old" + i);
+          }
+        }
+      });
+
+    vm2.invoke(createNormal);
+    vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          ctl.enableEventHistory();
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("Update region") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "new" + i, "callbackArg" + i);
+      }
+        }
+    });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForInvalidated(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNull(entry); // we are empty!
+                }
+          {
+            List l = ctl.getEventHistory();
+            assertEquals(10, l.size());
+            for (int i = 0; i < 10; i++) {
+              Object key = new Integer(i);
+              EntryEvent ee = (EntryEvent)l.get(i);
+              assertEquals(key, ee.getKey());
+              assertEquals(null, ee.getOldValue());
+              assertEquals(false, ee.isOldValueAvailable()); // failure
+              assertEquals(Operation.INVALIDATE, ee.getOperation());
+              assertEquals("callbackArg" + i, ee.getCallbackArgument());
+              assertEquals(true, ee.isOriginRemote());
+              }
+          }
+
+        }
+      });
+
+
+    vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+          public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("new" + i, region.getEntry(key).getValue());
+            region.destroy(key, "destroyCB"+i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForDestroyed(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNull(entry);
+          }
+          {
+            List l = ctl.getEventHistory();
+            assertEquals(10, l.size());
+            for (int i = 0; i < 10; i++) {
+              Object key = new Integer(i);
+              EntryEvent ee = (EntryEvent)l.get(i);
+              assertEquals(key, ee.getKey());
+              assertEquals(null, ee.getOldValue());
+              assertEquals(false, ee.isOldValueAvailable());
+              assertEquals(Operation.DESTROY, ee.getOperation());
+              assertEquals("destroyCB"+i, ee.getCallbackArgument());
+              assertEquals(true, ee.isOriginRemote());
+            }
+          }
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("recreate") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            region.create(key, "create" + i, "createCB"+i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+    
+    vm1.invoke(new CacheSerializableRunnable("Verify creates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            ctl.waitForInvalidated(key);
+            Region.Entry entry = region.getEntry(key);
+            assertNull(entry);
+          }
+          List l = ctl.getEventHistory();
+          assertEquals(10, l.size());
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            EntryEvent ee = (EntryEvent)l.get(i);
+            assertEquals(key, ee.getKey());
+            assertEquals(null, ee.getOldValue());
+            assertEquals(false, ee.isOldValueAvailable());
+            assertEquals(Operation.INVALIDATE, ee.getOperation());
+            assertEquals("createCB"+i, ee.getCallbackArgument());
+            assertEquals(true, ee.isOriginRemote());
+          }
+          // now see if we can get it from the server
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("create"+i, region.get(key, "loadCB"+i));
+          }
+          l = ctl.getEventHistory();
+          assertEquals(10, l.size());
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            EntryEvent ee = (EntryEvent)l.get(i);
+            assertEquals(key, ee.getKey());
+            assertEquals(null, ee.getOldValue());
+            assertEquals("create"+i, ee.getNewValue());
+            assertEquals(Operation.LOCAL_LOAD_CREATE, ee.getOperation());
+            assertEquals("loadCB"+i, ee.getCallbackArgument());
+            assertEquals(false, ee.isOriginRemote());
+          }
+        }
+      });
+
+  SerializableRunnable close =
+    new CacheSerializableRunnable("Close Pool") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(name);
+        region.localDestroyRegion();
+      }
+    };
+
+    vm1.invoke(close);
+    vm2.invoke(close);
+
+    vm0.invoke(new SerializableRunnable("Stop CacheServer") {
+        public void run() {
+          stopBridgeServer(getCache());
+        }
+    });
+  }
+
+  /**
+   * Tests that invalidates and destroys are propagated to {@link Pool}s
+   * correctly to DataPolicy.EMPTY + InterestPolicy.CACHE_CONTENT
+   *
+   * @since 5.0
+   */
+  public void test016InvalidateAndDestroyToEmptyCCPropagation() throws CacheException {
+    final String name = this.getName();
+    final Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    vm0.invoke(new CacheSerializableRunnable("Create Cache Server") {
+        public void run2() throws CacheException {
+          AttributesFactory factory = getBridgeServerRegionAttributes(null, null);
+          createRegion(name, factory.create());
+          try {
+            startBridgeServer(0);
+
+          } catch (Exception ex) {
+            com.gemstone.gemfire.test.dunit.Assert.fail("While starting CacheServer", ex);
+          }
+
+        }
+      });
+    final int port =
+      vm0.invoke(() -> ConnectionPoolDUnitTest.getCacheServerPort());
+    final String host0 = NetworkUtils.getServerHostName(vm0.getHost());
+
+    SerializableRunnable createEmpty =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            factory.setDataPolicy(DataPolicy.EMPTY);
+            factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+         }
+        };
+    SerializableRunnable createNormal =
+      new CacheSerializableRunnable("Create region") {
+          public void run2() throws CacheException {
+            getLonerSystem();
+            getCache();
+            AttributesFactory factory = new AttributesFactory();
+            factory.setScope(Scope.LOCAL);
+            factory.setConcurrencyChecksEnabled(false);
+            ClientServerTestCase.configureConnectionPool(factory,host0,port,-1,true,-1,-1, null);
+            CertifiableTestCacheListener l = new CertifiableTestCacheListener(com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter());
+            factory.setCacheListener(l);
+            Region rgn = createRegion(name, factory.create());
+            rgn.registerInterestRegex(".*", false, false);
+          }
+        };
+
+    vm1.invoke(createEmpty);
+    vm1.invoke(new CacheSerializableRunnable("Populate region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "old" + i);
+          }
+        }
+      });
+
+    vm2.invoke(createNormal);
+    vm1.invoke(new CacheSerializableRunnable("Turn on history") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          ctl.enableEventHistory();
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("Update region") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            region.put(new Integer(i), "new" + i, "callbackArg" + i);
+          }
+        }
+      });
+    Wait.pause(5 * 1000);
+
+    vm1.invoke(new CacheSerializableRunnable("Verify invalidates") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          List l = ctl.getEventHistory();
+          assertEquals(0, l.size());
+        }
+      });
+
+
+    vm2.invoke(new CacheSerializableRunnable("Validate original and destroy") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            assertEquals("new" + i, region.getEntry(key).getValue());
+            region.destroy(key, "destroyCB"+i);
+          }
+        }
+      });
+
+    vm1.invoke(new CacheSerializableRunnable("Verify destroys") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener();
+          List l = ctl.getEventHistory();
+          assertEquals(0, l.size());
+        }
+      });
+    vm2.invoke(new CacheSerializableRunnable("recreate") {
+        public void run2() throws CacheException {
+          Region region = getRootRegion().getSubregion(name);
+          for (int i = 0; i < 10; i++) {
+            Object key = new Integer(i);
+            region.create(key, "create" + i, "createCB"+i);
+          }
+        }
+

<TRUNCATED>


[2/5] incubator-geode git commit: GEODE-1111 Connection Pooling needs more tests

Posted by kl...@apache.org.
GEODE-1111 Connection Pooling needs more tests

These tests were dependent on an Order class that is only available in
Pivotal's old test framework.  I've created a small Order class that replaces
it and allows the tests to run as part of Geode's geode-core distributedTest
task.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4ed2fd37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4ed2fd37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4ed2fd37

Branch: refs/heads/feature/GEODE-1050
Commit: 4ed2fd374cf27ff704b09600eef263b71be9eabc
Parents: ac3d3b4
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Thu Mar 17 15:04:59 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Thu Mar 17 15:10:10 2016 -0700

----------------------------------------------------------------------
 .../cache/ConnectionPoolAutoDUnitTest.java      |   45 +
 .../gemfire/cache/ConnectionPoolDUnitTest.java  | 5871 ++++++++++++++++++
 2 files changed, 5916 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4ed2fd37/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
new file mode 100755
index 0000000..ad110d7
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
@@ -0,0 +1,45 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache;
+
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+
+import static org.junit.runners.MethodSorters.*;
+import org.junit.FixMethodOrder;
+
+@FixMethodOrder(NAME_ASCENDING)
+public class ConnectionPoolAutoDUnitTest extends ConnectionPoolDUnitTest {
+
+  public ConnectionPoolAutoDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    // TODO Auto-generated method stub
+    ClientServerTestCase.AUTO_LOAD_BALANCE = true;
+    Invoke.invokeInEveryVM(new SerializableRunnable("setupAutoMode") {
+      public void run() {
+        ClientServerTestCase.AUTO_LOAD_BALANCE = true;
+      }
+    });
+  }
+
+  @Override
+  protected final void postTearDownConnectionPoolDUnitTest() throws Exception {
+    ClientServerTestCase.AUTO_LOAD_BALANCE  = false;
+    Invoke.invokeInEveryVM(new SerializableRunnable("disableAutoMode") {
+      public void run() {
+        ClientServerTestCase.AUTO_LOAD_BALANCE = false;
+      }
+    });
+  }
+  
+}


[5/5] incubator-geode git commit: Disable DistTXDebugDUnitTest because Dist TX is not supposed to be on develop and it fails with no members to host buckets

Posted by kl...@apache.org.
Disable DistTXDebugDUnitTest because Dist TX is not supposed to be on develop and it fails with no members to host buckets


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/893dc86b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/893dc86b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/893dc86b

Branch: refs/heads/feature/GEODE-1050
Commit: 893dc86b95aa38cae55ad421c98b61dbbc7e461f
Parents: ad390c9
Author: Kirk Lund <kl...@apache.org>
Authored: Thu Mar 17 16:11:05 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Mar 17 16:11:05 2016 -0700

----------------------------------------------------------------------
 .../disttx/DistTXDebugDUnitDisabledTest.java    | 1016 +++++++++++++++++
 .../gemfire/disttx/DistTXDebugDUnitTest.java    | 1017 ------------------
 .../disttx/DistTXDistributedTestSuite.java      |   42 -
 .../disttx/DistTXPersistentDebugDUnitTest.java  |    2 +-
 4 files changed, 1017 insertions(+), 1060 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/893dc86b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitDisabledTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitDisabledTest.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitDisabledTest.java
new file mode 100644
index 0000000..dbd5d3c
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitDisabledTest.java
@@ -0,0 +1,1016 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.disttx;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryOperation;
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.PartitionResolver;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
+import com.gemstone.gemfire.internal.cache.execute.CustomerIDPartitionResolver;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.VM;
+
+/**
+ * TODO: reenable this test and fix it when work on Dist TX resumes -- it fails with no members to host buckets
+ */
+public class DistTXDebugDUnitDisabledTest extends CacheTestCase {
+  VM accessor = null;
+  VM dataStore1 = null;
+  VM dataStore2 = null;
+  VM dataStore3 = null;
+
+  public DistTXDebugDUnitDisabledTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public final void postSetUp() throws Exception {
+    Host host = Host.getHost(0);
+    dataStore1 = host.getVM(0);
+    dataStore2 = host.getVM(1);
+    dataStore3 = host.getVM(2);
+    accessor = host.getVM(3);
+    postSetUpDistTXDebugDUnitTest();
+  }
+
+  protected void postSetUpDistTXDebugDUnitTest() throws Exception {
+  }
+
+  @Override
+  public final void postTearDownCacheTestCase() throws Exception {
+    Invoke.invokeInEveryVM(new SerializableRunnable() {
+      public void run() {
+        InternalResourceManager.setResourceObserver(null);
+      }
+    });
+    InternalResourceManager.setResourceObserver(null);
+  }
+
+  public static void createCacheInVm() {
+    new DistTXDebugDUnitDisabledTest("temp").getCache();
+  }
+
+  protected void createCacheInAllVms() {
+    dataStore1.invoke(() -> DistTXDebugDUnitDisabledTest.createCacheInVm());
+    dataStore2.invoke(() -> DistTXDebugDUnitDisabledTest.createCacheInVm());
+    dataStore3.invoke(() -> DistTXDebugDUnitDisabledTest.createCacheInVm());
+    accessor.invoke(() -> DistTXDebugDUnitDisabledTest.createCacheInVm());
+  }
+
+  public static void createPR(String partitionedRegionName, Integer redundancy,
+      Integer localMaxMemory, Integer totalNumBuckets, Object colocatedWith,
+      Boolean isPartitionResolver) {
+    createPR(partitionedRegionName, redundancy, localMaxMemory,
+        totalNumBuckets, colocatedWith, isPartitionResolver, 
+        Boolean.TRUE/*Concurrency checks; By default is false*/);
+  }
+
+  public static void createPR(String partitionedRegionName, Integer redundancy,
+      Integer localMaxMemory, Integer totalNumBuckets, Object colocatedWith,
+      Boolean isPartitionResolver, Boolean concurrencyChecks) {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+
+    paf.setRedundantCopies(redundancy.intValue());
+    if (localMaxMemory != null) {
+      paf.setLocalMaxMemory(localMaxMemory.intValue());
+    }
+    if (totalNumBuckets != null) {
+      paf.setTotalNumBuckets(totalNumBuckets.intValue());
+    }
+    if (colocatedWith != null) {
+      paf.setColocatedWith((String) colocatedWith);
+    }
+    if (isPartitionResolver.booleanValue()) {
+      paf.setPartitionResolver(new CustomerIDPartitionResolver(
+          "CustomerIDPartitionResolver"));
+    }
+    PartitionAttributes prAttr = paf.create();
+    AttributesFactory attr = new AttributesFactory();
+    attr.setPartitionAttributes(prAttr);
+    attr.setConcurrencyChecksEnabled(concurrencyChecks);
+    // assertNotNull(basicGetCache());
+    // Region pr = basicGetCache().createRegion(partitionedRegionName,
+    // attr.create());
+    assertNotNull(basicGetCache());
+    Region pr = basicGetCache().createRegion(partitionedRegionName, attr.create());
+    assertNotNull(pr);
+    LogWriterUtils.getLogWriter().info(
+        "Partitioned Region " + partitionedRegionName
+            + " created Successfully :" + pr.toString());
+  }
+
+  protected void createPartitionedRegion(Object[] attributes) {
+    dataStore1.invoke(DistTXDebugDUnitDisabledTest.class, "createPR", attributes);
+    dataStore2.invoke(DistTXDebugDUnitDisabledTest.class, "createPR", attributes);
+    dataStore3.invoke(DistTXDebugDUnitDisabledTest.class, "createPR", attributes);
+    // make Local max memory = o for accessor
+    attributes[2] = new Integer(0);
+    accessor.invoke(DistTXDebugDUnitDisabledTest.class, "createPR", attributes);
+  }
+
+  public static void destroyPR(String partitionedRegionName) {
+    // assertNotNull(basicGetCache());
+    // Region pr = basicGetCache().getRegion(partitionedRegionName);
+
+    assertNotNull(basicGetCache());
+    Region pr = basicGetCache().getRegion(partitionedRegionName);
+    assertNotNull(pr);
+    LogWriterUtils.getLogWriter().info(
+        "Destroying Partitioned Region " + partitionedRegionName);
+    pr.destroyRegion();
+  }
+
+  public static void createRR(String replicatedRegionName, boolean empty) {
+    AttributesFactory af = new AttributesFactory();
+    af.setScope(Scope.DISTRIBUTED_ACK);
+    if (empty) {
+      af.setDataPolicy(DataPolicy.EMPTY);
+    } else {
+      af.setDataPolicy(DataPolicy.REPLICATE);
+    }
+    // Region rr = basicGetCache().createRegion(replicatedRegionName,
+    // af.create());
+    Region rr = basicGetCache().createRegion(replicatedRegionName, af.create());
+    assertNotNull(rr);
+    LogWriterUtils.getLogWriter().info(
+        "Replicated Region " + replicatedRegionName + " created Successfully :"
+            + rr.toString());
+  }
+
+  protected void createReplicatedRegion(Object[] attributes) {
+    dataStore1.invoke(DistTXDebugDUnitDisabledTest.class, "createRR", attributes);
+    dataStore2.invoke(DistTXDebugDUnitDisabledTest.class, "createRR", attributes);
+    dataStore3.invoke(DistTXDebugDUnitDisabledTest.class, "createRR", attributes);
+    // DataPolicy.EMPTY for accessor
+    attributes[1] = Boolean.TRUE;
+    accessor.invoke(DistTXDebugDUnitDisabledTest.class, "createRR", attributes);
+  }
+
+  public void testTXPR() throws Exception {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        // PartitionedRegion pr1 = (PartitionedRegion)
+        // basicGetCache().getRegion(
+        // "pregion1");
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        // put some data (non tx ops)
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.put");
+          pr1.put(dummy, "1_entry__" + i);
+        }
+
+        // put in tx and commit
+        // CacheTransactionManager ctx = basicGetCache()
+        // .getCacheTransactionManager();
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.put in tx 1");
+          pr1.put(dummy, "2_entry__" + i);
+        }
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get");
+          assertEquals("2_entry__" + i, pr1.get(dummy));
+        }
+
+        // put data in tx and rollback
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.put in tx 2");
+          pr1.put(dummy, "3_entry__" + i);
+        }
+        ctx.rollback();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get");
+          assertEquals("2_entry__" + i, pr1.get(dummy));
+        }
+
+        // destroy data in tx and commit
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.destroy in tx 3");
+          pr1.destroy(dummy);
+        }
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get");
+          assertEquals(null, pr1.get(dummy));
+        }
+
+        // verify data size on all replicas
+        SerializableCallable verifySize = new SerializableCallable("getOps") {
+          @Override
+          public Object call() throws CacheException {
+            PartitionedRegion pr1 = (PartitionedRegion) basicGetCache()
+                .getRegion("pregion1");
+            LogWriterUtils.getLogWriter().info(
+                " calling pr.getLocalSize " + pr1.getLocalSize());
+            assertEquals(0, pr1.getLocalSize());
+            return null;
+          }
+        };
+        dataStore1.invoke(verifySize);
+        dataStore2.invoke(verifySize);
+        dataStore3.invoke(verifySize);
+
+        return null;
+      }
+    };
+
+    accessor.invoke(TxOps);
+
+    accessor.invoke(() -> DistTXDebugDUnitDisabledTest.destroyPR( "pregion1" ));
+  }
+
+  public void testTXDestroy_invalidate() throws Exception {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+    createReplicatedRegion(rrAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        Region rr1 = basicGetCache().getRegion("rregion1");
+
+        // put some data (non tx ops)
+        for (int i = 1; i <= 6; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling non-tx put");
+          pr1.put(dummy, "1_entry__" + i);
+          rr1.put(dummy, "1_entry__" + i);
+        }
+
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        // destroy data in tx and commit
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(
+              " calling pr1.destroy in tx key=" + dummy);
+          pr1.destroy(dummy);
+          LogWriterUtils.getLogWriter().info(" calling rr1.destroy in tx key=" + i);
+          rr1.destroy(dummy);
+        }
+        for (int i = 4; i <= 6; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(
+              " calling pr1.invalidate in tx key=" + dummy);
+          pr1.invalidate(dummy);
+          LogWriterUtils.getLogWriter().info(" calling rr1.invalidate in tx key=" + i);
+          rr1.invalidate(dummy);
+        }
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 6; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr1.get");
+          assertEquals(null, pr1.get(dummy));
+          LogWriterUtils.getLogWriter().info(" calling rr1.get");
+          assertEquals(null, rr1.get(i));
+        }
+        return null;
+      }
+    };
+
+    accessor.invoke(TxOps);
+
+    // verify data size on all replicas
+    SerializableCallable verifySize = new SerializableCallable("getOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        LogWriterUtils.getLogWriter().info(
+            " calling pr1.getLocalSize " + pr1.getLocalSize());
+        assertEquals(2, pr1.getLocalSize());
+        LogWriterUtils.getLogWriter().info(" calling rr1.size " + rr1.size());
+        assertEquals(3, rr1.size());
+        return null;
+      }
+    };
+    dataStore1.invoke(verifySize);
+    dataStore2.invoke(verifySize);
+    dataStore3.invoke(verifySize);
+
+    accessor.invoke(() -> DistTXDebugDUnitDisabledTest.destroyPR( "pregion1" ));
+  }
+
+  public void testTXPR_RR() throws Exception {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+    createReplicatedRegion(rrAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        // PartitionedRegion pr1 = (PartitionedRegion)
+        // basicGetCache().getRegion(
+        // "pregion1");
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        // Region rr1 = basicGetCache().getRegion("rregion1");
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        // put some data (non tx ops)
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.put non-tx PR1_entry__" + i);
+          pr1.put(dummy, "PR1_entry__" + i);
+          LogWriterUtils.getLogWriter().info(" calling rr.put non-tx RR1_entry__" + i);
+          rr1.put(new Integer(i), "RR1_entry__" + i);
+        }
+
+        // put in tx and commit
+        // CacheTransactionManager ctx = basicGetCache()
+        // .getCacheTransactionManager();
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.put in tx PR2_entry__" + i);
+          pr1.put(dummy, "PR2_entry__" + i);
+          LogWriterUtils.getLogWriter().info(" calling rr.put in tx RR2_entry__" + i);
+          rr1.put(new Integer(i), "RR2_entry__" + i);
+        }
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get PR2_entry__" + i);
+          assertEquals("PR2_entry__" + i, pr1.get(dummy));
+          LogWriterUtils.getLogWriter().info(" calling rr.get RR2_entry__" + i);
+          assertEquals("RR2_entry__" + i, rr1.get(new Integer(i)));
+        }
+        return null;
+      }
+    };
+
+    accessor.invoke(TxOps);
+
+    // verify data size on all replicas
+    SerializableCallable verifySize = new SerializableCallable("getOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        LogWriterUtils.getLogWriter().info(
+            " calling pr.getLocalSize " + pr1.getLocalSize());
+        assertEquals(2, pr1.getLocalSize());
+
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        LogWriterUtils.getLogWriter()
+            .info(" calling rr.getLocalSize " + rr1.size());
+        assertEquals(3, rr1.size());
+        return null;
+      }
+    };
+    dataStore1.invoke(verifySize);
+    dataStore2.invoke(verifySize);
+    dataStore3.invoke(verifySize);
+
+    accessor.invoke(() -> DistTXDebugDUnitDisabledTest.destroyPR( "pregion1" ));
+  }
+
+  public void testTXPR2() throws Exception {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        // PartitionedRegion pr1 = (PartitionedRegion)
+        // basicGetCache().getRegion(
+        // "pregion1");
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+
+        // put in tx and commit
+        // CacheTransactionManager ctx = basicGetCache()
+        // .getCacheTransactionManager();
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.put in tx 1");
+          pr1.put(dummy, "2_entry__" + i);
+        }
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+          assertEquals("2_entry__" + i, pr1.get(dummy));
+        }
+        return null;
+      }
+    };
+
+    accessor.invoke(TxOps);
+
+    SerializableCallable TxGetOps = new SerializableCallable("TxGetOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        LogWriterUtils.getLogWriter().info(
+            " calling pr.getLocalSize " + pr1.getLocalSize());
+        assertEquals(2, pr1.getLocalSize());
+        return null;
+      }
+    };
+
+    dataStore1.invoke(TxGetOps);
+    dataStore2.invoke(TxGetOps);
+    dataStore3.invoke(TxGetOps);
+
+    SerializableCallable TxRollbackOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        // PartitionedRegion pr1 = (PartitionedRegion)
+        // basicGetCache().getRegion(
+        // "pregion1");
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+
+        // put in tx and commit
+        // CacheTransactionManager ctx = basicGetCache()
+        // .getCacheTransactionManager();
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(
+              " calling pr.put in tx for rollback no_entry__" + i);
+          pr1.put(dummy, "no_entry__" + i);
+        }
+        ctx.rollback();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(
+              " calling pr.get after rollback " + pr1.get(dummy));
+          assertEquals("2_entry__" + i, pr1.get(dummy));
+        }
+        return null;
+      }
+    };
+
+    accessor.invoke(TxRollbackOps);
+
+    accessor.invoke(() -> DistTXDebugDUnitDisabledTest.destroyPR( "pregion1" ));
+  }
+  
+  public void testTXPRRR2_create() throws Exception {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+    createReplicatedRegion(rrAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.create in tx 1");
+          pr1.create(dummy, "2_entry__" + i);
+          
+          LogWriterUtils.getLogWriter().info(" calling rr.create " + "2_entry__" + i);
+          rr1.create(new Integer(i), "2_entry__" + i);
+        }
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+          assertEquals("2_entry__" + i, pr1.get(dummy));
+          
+          LogWriterUtils.getLogWriter().info(
+              " calling rr.get " + rr1.get(new Integer(i)));
+          assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
+        }
+        return null;
+      }
+    };
+
+    accessor.invoke(TxOps);
+
+    // verify data size on all replicas
+    SerializableCallable verifySize = new SerializableCallable("getOps") {
+      @Override
+      public Object call() throws CacheException {
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        LogWriterUtils.getLogWriter()
+            .info(" calling rr.getLocalSize " + rr1.size());
+        assertEquals(3, rr1.size());
+        
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        LogWriterUtils.getLogWriter().info(
+            " calling pr.getLocalSize " + pr1.getLocalSize());
+        assertEquals(2, pr1.getLocalSize());
+        return null;
+      }
+    };
+    dataStore1.invoke(verifySize);
+    dataStore2.invoke(verifySize);
+    dataStore3.invoke(verifySize);
+  }
+  
+  public void testTXPRRR2_putall() throws Exception {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+    createReplicatedRegion(rrAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
+        HashMap<Integer, String> rhm = new HashMap<Integer, String>();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          phm.put(dummy, "2_entry__" + i);
+          rhm.put(i, "2_entry__" + i);
+        }
+        pr1.putAll(phm);
+        rr1.putAll(rhm);
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+          assertEquals("2_entry__" + i, pr1.get(dummy));
+          
+          LogWriterUtils.getLogWriter().info(
+              " calling rr.get " + rr1.get(new Integer(i)));
+          assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
+        }
+        return null;
+      }
+    };
+
+    accessor.invoke(TxOps);
+
+    // verify data size on all replicas
+    SerializableCallable verifySize = new SerializableCallable("getOps") {
+      @Override
+      public Object call() throws CacheException {
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        LogWriterUtils.getLogWriter()
+            .info(" calling rr.getLocalSize " + rr1.size());
+        assertEquals(3, rr1.size());
+        
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        LogWriterUtils.getLogWriter().info(
+            " calling pr.getLocalSize " + pr1.getLocalSize());
+        assertEquals(2, pr1.getLocalSize());
+        return null;
+      }
+    };
+    dataStore1.invoke(verifySize);
+    dataStore2.invoke(verifySize);
+    dataStore3.invoke(verifySize);
+    
+//    accessor.invoke(TxOps);
+  }
+  
+  public void testTXPR_putall() throws Exception {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
+        HashMap<Integer, String> rhm = new HashMap<Integer, String>();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          phm.put(dummy, "2_entry__" + i);
+        }
+        pr1.putAll(phm);
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+          assertEquals("2_entry__" + i, pr1.get(dummy));
+          
+        }
+        return null;
+      }
+    };
+
+//    dataStore1.invoke(TxOps);
+    accessor.invoke(TxOps);
+
+    // verify data size on all replicas
+    SerializableCallable verifySize = new SerializableCallable("getOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        LogWriterUtils.getLogWriter().info(
+            " calling pr.getLocalSize " + pr1.getLocalSize());
+        assertEquals(2, pr1.getLocalSize());
+        return null;
+      }
+    };
+    dataStore1.invoke(verifySize);
+    dataStore2.invoke(verifySize);
+    dataStore3.invoke(verifySize);
+    
+//    accessor.invoke(TxOps);
+  }
+
+  
+  public void testTXRR_removeAll() throws Exception {
+    performRR_removeAllTest(false);
+  }
+  
+  public void testTXRR_removeAll_dataNodeAsCoordinator() throws Exception {
+    performRR_removeAllTest(true);
+  }
+
+  /**
+   * @param dataNodeAsCoordinator TODO
+   * 
+   */
+  private void performRR_removeAllTest(boolean dataNodeAsCoordinator) {
+    createCacheInAllVms();
+    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+    createReplicatedRegion(rrAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        //put some data
+        HashMap<Integer, String> rhm = new HashMap<Integer, String>();
+        for (int i = 1; i <= 3; i++) {
+          rhm.put(i, "2_entry__" + i);
+        }
+        rr1.putAll(rhm);
+        
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        rr1.removeAll(rhm.keySet());
+
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          LogWriterUtils.getLogWriter().info(
+              " calling rr.get " + rr1.get(new Integer(i)));
+          assertEquals(null, rr1.get(new Integer(i)));
+        }
+        return null;
+      }
+    };
+    
+    if (dataNodeAsCoordinator) {
+      dataStore1.invoke(TxOps);
+    } else {
+      accessor.invoke(TxOps);
+    }
+
+    // verify data size on all replicas
+    SerializableCallable verifySize = new SerializableCallable("getOps") {
+      @Override
+      public Object call() throws CacheException {
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        LogWriterUtils.getLogWriter()
+            .info(" calling rr.getLocalSize " + rr1.size());
+        assertEquals(0, rr1.size());
+        return null;
+      }
+    };
+    dataStore1.invoke(verifySize);
+    dataStore2.invoke(verifySize);
+    dataStore3.invoke(verifySize);
+    
+//    accessor.invoke(TxOps);
+  }
+  
+  public void testTXPR_removeAll() throws Exception {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          phm.put(dummy, "2_entry__" + i);
+        }
+        pr1.putAll(phm);
+        
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        pr1.removeAll(phm.keySet());
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
+              i);
+          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
+          assertEquals(null, pr1.get(dummy));
+        }
+        return null;
+      }
+    };
+
+    accessor.invoke(TxOps);
+
+    // verify data size on all replicas
+    SerializableCallable verifySize = new SerializableCallable("getOps") {
+      @Override
+      public Object call() throws CacheException {
+        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
+        LogWriterUtils.getLogWriter().info(
+            " calling pr.getLocalSize " + pr1.getLocalSize());
+        assertEquals(0, pr1.getLocalSize());
+        return null;
+      }
+    };
+    dataStore1.invoke(verifySize);
+    dataStore2.invoke(verifySize);
+    dataStore3.invoke(verifySize);
+    
+//    accessor.invoke(TxOps);
+  }
+
+  
+  public void performTXRRtestOps(boolean makeDatNodeAsCoordinator) {
+    createCacheInAllVms();
+    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
+        Boolean.FALSE, Boolean.FALSE };
+    createPartitionedRegion(prAttrs);
+
+    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
+    createReplicatedRegion(rrAttrs);
+
+    SerializableCallable TxOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          LogWriterUtils.getLogWriter().info(" calling rr.put " + "2_entry__" + i);
+          rr1.put(new Integer(i), "2_entry__" + i);
+        }
+        ctx.commit();
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          LogWriterUtils.getLogWriter().info(
+              " calling rr.get " + rr1.get(new Integer(i)));
+          assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
+        }
+        return null;
+      }
+    };
+    
+    if (makeDatNodeAsCoordinator) {
+      dataStore1.invoke(TxOps);
+    } else {
+      accessor.invoke(TxOps);  
+    }
+
+    // verify data size on all replicas
+    SerializableCallable verifySize = new SerializableCallable("getOps") {
+      @Override
+      public Object call() throws CacheException {
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        LogWriterUtils.getLogWriter()
+            .info(" calling rr.getLocalSize " + rr1.size());
+        assertEquals(3, rr1.size());
+        return null;
+      }
+    };
+    dataStore1.invoke(verifySize);
+    dataStore2.invoke(verifySize);
+    dataStore3.invoke(verifySize);
+
+    SerializableCallable TxRollbackOps = new SerializableCallable("TxOps") {
+      @Override
+      public Object call() throws CacheException {
+        Region rr1 = basicGetCache().getRegion("rregion1");
+        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
+        ctx.setDistributed(true);
+        ctx.begin();
+        for (int i = 1; i <= 3; i++) {
+          LogWriterUtils.getLogWriter().info(
+              " calling rr.put for rollback no_entry__" + i);
+          rr1.put(new Integer(i), "no_entry__" + i);
+        }
+        ctx.rollback();
+        ;
+
+        // verify the data
+        for (int i = 1; i <= 3; i++) {
+          LogWriterUtils.getLogWriter().info(
+              " calling rr.get after rollback "
+                  + rr1.get(new Integer(i)));
+          assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
+        }
+        return null;
+      }
+    };
+
+    if (makeDatNodeAsCoordinator) {
+      dataStore1.invoke(TxRollbackOps);
+    } else {
+      accessor.invoke(TxRollbackOps);  
+    }
+  }
+    
+
+  public void testTXRR2() throws Exception {
+    performTXRRtestOps(false); // actual test
+  }
+
+  public void testTXRR2_dataNodeAsCoordinator() throws Exception {
+    performTXRRtestOps(true);
+  }
+}
+
+class DummyKeyBasedRoutingResolver implements PartitionResolver,
+    DataSerializable {
+  Integer dummyID;
+
+  public DummyKeyBasedRoutingResolver() {
+  }
+
+  public DummyKeyBasedRoutingResolver(int id) {
+    this.dummyID = new Integer(id);
+  }
+
+  public String getName() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public Serializable getRoutingObject(EntryOperation opDetails) {
+    return (Serializable) opDetails.getKey();
+  }
+
+  public void close() {
+    // TODO Auto-generated method stub
+  }
+
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+    this.dummyID = DataSerializer.readInteger(in);
+  }
+
+  public void toData(DataOutput out) throws IOException {
+    DataSerializer.writeInteger(this.dummyID, out);
+  }
+
+  @Override
+  public int hashCode() {
+    int i = this.dummyID.intValue();
+    return i;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+
+    if (!(o instanceof DummyKeyBasedRoutingResolver))
+      return false;
+
+    DummyKeyBasedRoutingResolver otherDummyID = (DummyKeyBasedRoutingResolver) o;
+    return (otherDummyID.dummyID.equals(dummyID));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/893dc86b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitTest.java
deleted file mode 100644
index 71374d0..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDebugDUnitTest.java
+++ /dev/null
@@ -1,1017 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.disttx;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Properties;
-
-import com.gemstone.gemfire.DataSerializable;
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheException;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EntryOperation;
-import com.gemstone.gemfire.cache.PartitionAttributes;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.PartitionResolver;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache30.CacheTestCase;
-import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
-import com.gemstone.gemfire.internal.cache.PartitionedRegion;
-import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
-import com.gemstone.gemfire.internal.cache.execute.CustomerIDPartitionResolver;
-import com.gemstone.gemfire.test.dunit.Host;
-import com.gemstone.gemfire.test.dunit.Invoke;
-import com.gemstone.gemfire.test.dunit.LogWriterUtils;
-import com.gemstone.gemfire.test.dunit.SerializableCallable;
-import com.gemstone.gemfire.test.dunit.SerializableRunnable;
-import com.gemstone.gemfire.test.dunit.VM;
-
-public class DistTXDebugDUnitTest extends CacheTestCase {
-  VM accessor = null;
-  VM dataStore1 = null;
-  VM dataStore2 = null;
-  VM dataStore3 = null;
-
-  public DistTXDebugDUnitTest(String name) {
-    super(name);
-  }
-
-  @Override
-  public final void postSetUp() throws Exception {
-    Host host = Host.getHost(0);
-    dataStore1 = host.getVM(0);
-    dataStore2 = host.getVM(1);
-    dataStore3 = host.getVM(2);
-    accessor = host.getVM(3);
-    postSetUpDistTXDebugDUnitTest();
-  }
-
-  protected void postSetUpDistTXDebugDUnitTest() throws Exception {
-  }
-
-  @Override
-  public final void postTearDownCacheTestCase() throws Exception {
-    Invoke.invokeInEveryVM(new SerializableRunnable() {
-      public void run() {
-        InternalResourceManager.setResourceObserver(null);
-      }
-    });
-    InternalResourceManager.setResourceObserver(null);
-  }
-
-  public static void createCacheInVm() {
-    new DistTXDebugDUnitTest("temp").getCache();
-  }
-
-  protected void createCacheInAllVms() {
-    dataStore1.invoke(() -> DistTXDebugDUnitTest.createCacheInVm());
-    dataStore2.invoke(() -> DistTXDebugDUnitTest.createCacheInVm());
-    dataStore3.invoke(() -> DistTXDebugDUnitTest.createCacheInVm());
-    accessor.invoke(() -> DistTXDebugDUnitTest.createCacheInVm());
-  }
-
-  public static void createPR(String partitionedRegionName, Integer redundancy,
-      Integer localMaxMemory, Integer totalNumBuckets, Object colocatedWith,
-      Boolean isPartitionResolver) {
-    createPR(partitionedRegionName, redundancy, localMaxMemory,
-        totalNumBuckets, colocatedWith, isPartitionResolver, 
-        Boolean.TRUE/*Concurrency checks; By default is false*/);
-  }
-
-  public static void createPR(String partitionedRegionName, Integer redundancy,
-      Integer localMaxMemory, Integer totalNumBuckets, Object colocatedWith,
-      Boolean isPartitionResolver, Boolean concurrencyChecks) {
-    PartitionAttributesFactory paf = new PartitionAttributesFactory();
-
-    paf.setRedundantCopies(redundancy.intValue());
-    if (localMaxMemory != null) {
-      paf.setLocalMaxMemory(localMaxMemory.intValue());
-    }
-    if (totalNumBuckets != null) {
-      paf.setTotalNumBuckets(totalNumBuckets.intValue());
-    }
-    if (colocatedWith != null) {
-      paf.setColocatedWith((String) colocatedWith);
-    }
-    if (isPartitionResolver.booleanValue()) {
-      paf.setPartitionResolver(new CustomerIDPartitionResolver(
-          "CustomerIDPartitionResolver"));
-    }
-    PartitionAttributes prAttr = paf.create();
-    AttributesFactory attr = new AttributesFactory();
-    attr.setPartitionAttributes(prAttr);
-    attr.setConcurrencyChecksEnabled(concurrencyChecks);
-    // assertNotNull(basicGetCache());
-    // Region pr = basicGetCache().createRegion(partitionedRegionName,
-    // attr.create());
-    assertNotNull(basicGetCache());
-    Region pr = basicGetCache().createRegion(partitionedRegionName, attr.create());
-    assertNotNull(pr);
-    LogWriterUtils.getLogWriter().info(
-        "Partitioned Region " + partitionedRegionName
-            + " created Successfully :" + pr.toString());
-  }
-
-  protected void createPartitionedRegion(Object[] attributes) {
-    dataStore1.invoke(DistTXDebugDUnitTest.class, "createPR", attributes);
-    dataStore2.invoke(DistTXDebugDUnitTest.class, "createPR", attributes);
-    dataStore3.invoke(DistTXDebugDUnitTest.class, "createPR", attributes);
-    // make Local max memory = o for accessor
-    attributes[2] = new Integer(0);
-    accessor.invoke(DistTXDebugDUnitTest.class, "createPR", attributes);
-  }
-
-  public static void destroyPR(String partitionedRegionName) {
-    // assertNotNull(basicGetCache());
-    // Region pr = basicGetCache().getRegion(partitionedRegionName);
-
-    assertNotNull(basicGetCache());
-    Region pr = basicGetCache().getRegion(partitionedRegionName);
-    assertNotNull(pr);
-    LogWriterUtils.getLogWriter().info(
-        "Destroying Partitioned Region " + partitionedRegionName);
-    pr.destroyRegion();
-  }
-
-  public static void createRR(String replicatedRegionName, boolean empty) {
-    AttributesFactory af = new AttributesFactory();
-    af.setScope(Scope.DISTRIBUTED_ACK);
-    if (empty) {
-      af.setDataPolicy(DataPolicy.EMPTY);
-    } else {
-      af.setDataPolicy(DataPolicy.REPLICATE);
-    }
-    // Region rr = basicGetCache().createRegion(replicatedRegionName,
-    // af.create());
-    Region rr = basicGetCache().createRegion(replicatedRegionName, af.create());
-    assertNotNull(rr);
-    LogWriterUtils.getLogWriter().info(
-        "Replicated Region " + replicatedRegionName + " created Successfully :"
-            + rr.toString());
-  }
-
-  protected void createReplicatedRegion(Object[] attributes) {
-    dataStore1.invoke(DistTXDebugDUnitTest.class, "createRR", attributes);
-    dataStore2.invoke(DistTXDebugDUnitTest.class, "createRR", attributes);
-    dataStore3.invoke(DistTXDebugDUnitTest.class, "createRR", attributes);
-    // DataPolicy.EMPTY for accessor
-    attributes[1] = Boolean.TRUE;
-    accessor.invoke(DistTXDebugDUnitTest.class, "createRR", attributes);
-  }
-
-  public void testTXPR() throws Exception {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        // PartitionedRegion pr1 = (PartitionedRegion)
-        // basicGetCache().getRegion(
-        // "pregion1");
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        // put some data (non tx ops)
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.put");
-          pr1.put(dummy, "1_entry__" + i);
-        }
-
-        // put in tx and commit
-        // CacheTransactionManager ctx = basicGetCache()
-        // .getCacheTransactionManager();
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.put in tx 1");
-          pr1.put(dummy, "2_entry__" + i);
-        }
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get");
-          assertEquals("2_entry__" + i, pr1.get(dummy));
-        }
-
-        // put data in tx and rollback
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.put in tx 2");
-          pr1.put(dummy, "3_entry__" + i);
-        }
-        ctx.rollback();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get");
-          assertEquals("2_entry__" + i, pr1.get(dummy));
-        }
-
-        // destroy data in tx and commit
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.destroy in tx 3");
-          pr1.destroy(dummy);
-        }
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get");
-          assertEquals(null, pr1.get(dummy));
-        }
-
-        // verify data size on all replicas
-        SerializableCallable verifySize = new SerializableCallable("getOps") {
-          @Override
-          public Object call() throws CacheException {
-            PartitionedRegion pr1 = (PartitionedRegion) basicGetCache()
-                .getRegion("pregion1");
-            LogWriterUtils.getLogWriter().info(
-                " calling pr.getLocalSize " + pr1.getLocalSize());
-            assertEquals(0, pr1.getLocalSize());
-            return null;
-          }
-        };
-        dataStore1.invoke(verifySize);
-        dataStore2.invoke(verifySize);
-        dataStore3.invoke(verifySize);
-
-        return null;
-      }
-    };
-
-    accessor.invoke(TxOps);
-
-    accessor.invoke(() -> DistTXDebugDUnitTest.destroyPR( "pregion1" ));
-  }
-
-  public void testTXDestroy_invalidate() throws Exception {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
-    createReplicatedRegion(rrAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        Region rr1 = basicGetCache().getRegion("rregion1");
-
-        // put some data (non tx ops)
-        for (int i = 1; i <= 6; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling non-tx put");
-          pr1.put(dummy, "1_entry__" + i);
-          rr1.put(dummy, "1_entry__" + i);
-        }
-
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        // destroy data in tx and commit
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(
-              " calling pr1.destroy in tx key=" + dummy);
-          pr1.destroy(dummy);
-          LogWriterUtils.getLogWriter().info(" calling rr1.destroy in tx key=" + i);
-          rr1.destroy(dummy);
-        }
-        for (int i = 4; i <= 6; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(
-              " calling pr1.invalidate in tx key=" + dummy);
-          pr1.invalidate(dummy);
-          LogWriterUtils.getLogWriter().info(" calling rr1.invalidate in tx key=" + i);
-          rr1.invalidate(dummy);
-        }
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 6; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr1.get");
-          assertEquals(null, pr1.get(dummy));
-          LogWriterUtils.getLogWriter().info(" calling rr1.get");
-          assertEquals(null, rr1.get(i));
-        }
-        return null;
-      }
-    };
-
-    accessor.invoke(TxOps);
-
-    // verify data size on all replicas
-    SerializableCallable verifySize = new SerializableCallable("getOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        LogWriterUtils.getLogWriter().info(
-            " calling pr1.getLocalSize " + pr1.getLocalSize());
-        assertEquals(2, pr1.getLocalSize());
-        LogWriterUtils.getLogWriter().info(" calling rr1.size " + rr1.size());
-        assertEquals(3, rr1.size());
-        return null;
-      }
-    };
-    dataStore1.invoke(verifySize);
-    dataStore2.invoke(verifySize);
-    dataStore3.invoke(verifySize);
-
-    accessor.invoke(() -> DistTXDebugDUnitTest.destroyPR( "pregion1" ));
-  }
-
-  public void testTXPR_RR() throws Exception {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
-    createReplicatedRegion(rrAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        // PartitionedRegion pr1 = (PartitionedRegion)
-        // basicGetCache().getRegion(
-        // "pregion1");
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        // Region rr1 = basicGetCache().getRegion("rregion1");
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        // put some data (non tx ops)
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.put non-tx PR1_entry__" + i);
-          pr1.put(dummy, "PR1_entry__" + i);
-          LogWriterUtils.getLogWriter().info(" calling rr.put non-tx RR1_entry__" + i);
-          rr1.put(new Integer(i), "RR1_entry__" + i);
-        }
-
-        // put in tx and commit
-        // CacheTransactionManager ctx = basicGetCache()
-        // .getCacheTransactionManager();
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.put in tx PR2_entry__" + i);
-          pr1.put(dummy, "PR2_entry__" + i);
-          LogWriterUtils.getLogWriter().info(" calling rr.put in tx RR2_entry__" + i);
-          rr1.put(new Integer(i), "RR2_entry__" + i);
-        }
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get PR2_entry__" + i);
-          assertEquals("PR2_entry__" + i, pr1.get(dummy));
-          LogWriterUtils.getLogWriter().info(" calling rr.get RR2_entry__" + i);
-          assertEquals("RR2_entry__" + i, rr1.get(new Integer(i)));
-        }
-        return null;
-      }
-    };
-
-    accessor.invoke(TxOps);
-
-    // verify data size on all replicas
-    SerializableCallable verifySize = new SerializableCallable("getOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        LogWriterUtils.getLogWriter().info(
-            " calling pr.getLocalSize " + pr1.getLocalSize());
-        assertEquals(2, pr1.getLocalSize());
-
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        LogWriterUtils.getLogWriter()
-            .info(" calling rr.getLocalSize " + rr1.size());
-        assertEquals(3, rr1.size());
-        return null;
-      }
-    };
-    dataStore1.invoke(verifySize);
-    dataStore2.invoke(verifySize);
-    dataStore3.invoke(verifySize);
-
-    accessor.invoke(() -> DistTXDebugDUnitTest.destroyPR( "pregion1" ));
-  }
-
-  public void testTXPR2() throws Exception {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        // PartitionedRegion pr1 = (PartitionedRegion)
-        // basicGetCache().getRegion(
-        // "pregion1");
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-
-        // put in tx and commit
-        // CacheTransactionManager ctx = basicGetCache()
-        // .getCacheTransactionManager();
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.put in tx 1");
-          pr1.put(dummy, "2_entry__" + i);
-        }
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
-          assertEquals("2_entry__" + i, pr1.get(dummy));
-        }
-        return null;
-      }
-    };
-
-    accessor.invoke(TxOps);
-
-    SerializableCallable TxGetOps = new SerializableCallable("TxGetOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        LogWriterUtils.getLogWriter().info(
-            " calling pr.getLocalSize " + pr1.getLocalSize());
-        assertEquals(2, pr1.getLocalSize());
-        return null;
-      }
-    };
-
-    dataStore1.invoke(TxGetOps);
-    dataStore2.invoke(TxGetOps);
-    dataStore3.invoke(TxGetOps);
-
-    SerializableCallable TxRollbackOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        // PartitionedRegion pr1 = (PartitionedRegion)
-        // basicGetCache().getRegion(
-        // "pregion1");
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-
-        // put in tx and commit
-        // CacheTransactionManager ctx = basicGetCache()
-        // .getCacheTransactionManager();
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(
-              " calling pr.put in tx for rollback no_entry__" + i);
-          pr1.put(dummy, "no_entry__" + i);
-        }
-        ctx.rollback();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(
-              " calling pr.get after rollback " + pr1.get(dummy));
-          assertEquals("2_entry__" + i, pr1.get(dummy));
-        }
-        return null;
-      }
-    };
-
-    accessor.invoke(TxRollbackOps);
-
-    accessor.invoke(() -> DistTXDebugDUnitTest.destroyPR( "pregion1" ));
-  }
-  
-  public void testTXPRRR2_create() throws Exception {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
-    createReplicatedRegion(rrAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.create in tx 1");
-          pr1.create(dummy, "2_entry__" + i);
-          
-          LogWriterUtils.getLogWriter().info(" calling rr.create " + "2_entry__" + i);
-          rr1.create(new Integer(i), "2_entry__" + i);
-        }
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
-          assertEquals("2_entry__" + i, pr1.get(dummy));
-          
-          LogWriterUtils.getLogWriter().info(
-              " calling rr.get " + rr1.get(new Integer(i)));
-          assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
-        }
-        return null;
-      }
-    };
-
-    accessor.invoke(TxOps);
-
-    // verify data size on all replicas
-    SerializableCallable verifySize = new SerializableCallable("getOps") {
-      @Override
-      public Object call() throws CacheException {
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        LogWriterUtils.getLogWriter()
-            .info(" calling rr.getLocalSize " + rr1.size());
-        assertEquals(3, rr1.size());
-        
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        LogWriterUtils.getLogWriter().info(
-            " calling pr.getLocalSize " + pr1.getLocalSize());
-        assertEquals(2, pr1.getLocalSize());
-        return null;
-      }
-    };
-    dataStore1.invoke(verifySize);
-    dataStore2.invoke(verifySize);
-    dataStore3.invoke(verifySize);
-  }
-  
-  public void testTXPRRR2_putall() throws Exception {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
-    createReplicatedRegion(rrAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
-        HashMap<Integer, String> rhm = new HashMap<Integer, String>();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          phm.put(dummy, "2_entry__" + i);
-          rhm.put(i, "2_entry__" + i);
-        }
-        pr1.putAll(phm);
-        rr1.putAll(rhm);
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
-          assertEquals("2_entry__" + i, pr1.get(dummy));
-          
-          LogWriterUtils.getLogWriter().info(
-              " calling rr.get " + rr1.get(new Integer(i)));
-          assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
-        }
-        return null;
-      }
-    };
-
-    accessor.invoke(TxOps);
-
-    // verify data size on all replicas
-    SerializableCallable verifySize = new SerializableCallable("getOps") {
-      @Override
-      public Object call() throws CacheException {
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        LogWriterUtils.getLogWriter()
-            .info(" calling rr.getLocalSize " + rr1.size());
-        assertEquals(3, rr1.size());
-        
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        LogWriterUtils.getLogWriter().info(
-            " calling pr.getLocalSize " + pr1.getLocalSize());
-        assertEquals(2, pr1.getLocalSize());
-        return null;
-      }
-    };
-    dataStore1.invoke(verifySize);
-    dataStore2.invoke(verifySize);
-    dataStore3.invoke(verifySize);
-    
-//    accessor.invoke(TxOps);
-  }
-  
-  public void testTXPR_putall() throws Exception {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
-        HashMap<Integer, String> rhm = new HashMap<Integer, String>();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          phm.put(dummy, "2_entry__" + i);
-        }
-        pr1.putAll(phm);
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
-          assertEquals("2_entry__" + i, pr1.get(dummy));
-          
-        }
-        return null;
-      }
-    };
-
-//    dataStore1.invoke(TxOps);
-    accessor.invoke(TxOps);
-
-    // verify data size on all replicas
-    SerializableCallable verifySize = new SerializableCallable("getOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        LogWriterUtils.getLogWriter().info(
-            " calling pr.getLocalSize " + pr1.getLocalSize());
-        assertEquals(2, pr1.getLocalSize());
-        return null;
-      }
-    };
-    dataStore1.invoke(verifySize);
-    dataStore2.invoke(verifySize);
-    dataStore3.invoke(verifySize);
-    
-//    accessor.invoke(TxOps);
-  }
-
-  
-  public void testTXRR_removeAll() throws Exception {
-    performRR_removeAllTest(false);
-  }
-  
-  public void testTXRR_removeAll_dataNodeAsCoordinator() throws Exception {
-    performRR_removeAllTest(true);
-  }
-
-  /**
-   * @param dataNodeAsCoordinator TODO
-   * 
-   */
-  private void performRR_removeAllTest(boolean dataNodeAsCoordinator) {
-    createCacheInAllVms();
-    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
-    createReplicatedRegion(rrAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        //put some data
-        HashMap<Integer, String> rhm = new HashMap<Integer, String>();
-        for (int i = 1; i <= 3; i++) {
-          rhm.put(i, "2_entry__" + i);
-        }
-        rr1.putAll(rhm);
-        
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        rr1.removeAll(rhm.keySet());
-
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          LogWriterUtils.getLogWriter().info(
-              " calling rr.get " + rr1.get(new Integer(i)));
-          assertEquals(null, rr1.get(new Integer(i)));
-        }
-        return null;
-      }
-    };
-    
-    if (dataNodeAsCoordinator) {
-      dataStore1.invoke(TxOps);
-    } else {
-      accessor.invoke(TxOps);
-    }
-
-    // verify data size on all replicas
-    SerializableCallable verifySize = new SerializableCallable("getOps") {
-      @Override
-      public Object call() throws CacheException {
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        LogWriterUtils.getLogWriter()
-            .info(" calling rr.getLocalSize " + rr1.size());
-        assertEquals(0, rr1.size());
-        return null;
-      }
-    };
-    dataStore1.invoke(verifySize);
-    dataStore2.invoke(verifySize);
-    dataStore3.invoke(verifySize);
-    
-//    accessor.invoke(TxOps);
-  }
-  
-  public void testTXPR_removeAll() throws Exception {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        HashMap<DummyKeyBasedRoutingResolver, String> phm = new HashMap<DummyKeyBasedRoutingResolver, String>();
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          phm.put(dummy, "2_entry__" + i);
-        }
-        pr1.putAll(phm);
-        
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        pr1.removeAll(phm.keySet());
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          DummyKeyBasedRoutingResolver dummy = new DummyKeyBasedRoutingResolver(
-              i);
-          LogWriterUtils.getLogWriter().info(" calling pr.get " + pr1.get(dummy));
-          assertEquals(null, pr1.get(dummy));
-        }
-        return null;
-      }
-    };
-
-    accessor.invoke(TxOps);
-
-    // verify data size on all replicas
-    SerializableCallable verifySize = new SerializableCallable("getOps") {
-      @Override
-      public Object call() throws CacheException {
-        PartitionedRegion pr1 = (PartitionedRegion) basicGetCache().getRegion("pregion1");
-        LogWriterUtils.getLogWriter().info(
-            " calling pr.getLocalSize " + pr1.getLocalSize());
-        assertEquals(0, pr1.getLocalSize());
-        return null;
-      }
-    };
-    dataStore1.invoke(verifySize);
-    dataStore2.invoke(verifySize);
-    dataStore3.invoke(verifySize);
-    
-//    accessor.invoke(TxOps);
-  }
-
-  
-  public void performTXRRtestOps(boolean makeDatNodeAsCoordinator) {
-    createCacheInAllVms();
-    Object[] prAttrs = new Object[] { "pregion1", 1, null, 3, null,
-        Boolean.FALSE, Boolean.FALSE };
-    createPartitionedRegion(prAttrs);
-
-    Object[] rrAttrs = new Object[] { "rregion1", Boolean.FALSE };
-    createReplicatedRegion(rrAttrs);
-
-    SerializableCallable TxOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          LogWriterUtils.getLogWriter().info(" calling rr.put " + "2_entry__" + i);
-          rr1.put(new Integer(i), "2_entry__" + i);
-        }
-        ctx.commit();
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          LogWriterUtils.getLogWriter().info(
-              " calling rr.get " + rr1.get(new Integer(i)));
-          assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
-        }
-        return null;
-      }
-    };
-    
-    if (makeDatNodeAsCoordinator) {
-      dataStore1.invoke(TxOps);
-    } else {
-      accessor.invoke(TxOps);  
-    }
-
-    // verify data size on all replicas
-    SerializableCallable verifySize = new SerializableCallable("getOps") {
-      @Override
-      public Object call() throws CacheException {
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        LogWriterUtils.getLogWriter()
-            .info(" calling rr.getLocalSize " + rr1.size());
-        assertEquals(3, rr1.size());
-        return null;
-      }
-    };
-    dataStore1.invoke(verifySize);
-    dataStore2.invoke(verifySize);
-    dataStore3.invoke(verifySize);
-
-    SerializableCallable TxRollbackOps = new SerializableCallable("TxOps") {
-      @Override
-      public Object call() throws CacheException {
-        Region rr1 = basicGetCache().getRegion("rregion1");
-        CacheTransactionManager ctx = basicGetCache().getCacheTransactionManager();
-        ctx.setDistributed(true);
-        ctx.begin();
-        for (int i = 1; i <= 3; i++) {
-          LogWriterUtils.getLogWriter().info(
-              " calling rr.put for rollback no_entry__" + i);
-          rr1.put(new Integer(i), "no_entry__" + i);
-        }
-        ctx.rollback();
-        ;
-
-        // verify the data
-        for (int i = 1; i <= 3; i++) {
-          LogWriterUtils.getLogWriter().info(
-              " calling rr.get after rollback "
-                  + rr1.get(new Integer(i)));
-          assertEquals("2_entry__" + i, rr1.get(new Integer(i)));
-        }
-        return null;
-      }
-    };
-
-    if (makeDatNodeAsCoordinator) {
-      dataStore1.invoke(TxRollbackOps);
-    } else {
-      accessor.invoke(TxRollbackOps);  
-    }
-  }
-    
-
-  public void testTXRR2() throws Exception {
-    performTXRRtestOps(false); // actual test
-  }
-
-  public void testTXRR2_dataNodeAsCoordinator() throws Exception {
-    performTXRRtestOps(true);
-  }
-}
-
-class DummyKeyBasedRoutingResolver implements PartitionResolver,
-    DataSerializable {
-  Integer dummyID;
-
-  public DummyKeyBasedRoutingResolver() {
-  }
-
-  public DummyKeyBasedRoutingResolver(int id) {
-    this.dummyID = new Integer(id);
-  }
-
-  public String getName() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  public Serializable getRoutingObject(EntryOperation opDetails) {
-    return (Serializable) opDetails.getKey();
-  }
-
-  public void close() {
-    // TODO Auto-generated method stub
-  }
-
-  public void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    this.dummyID = DataSerializer.readInteger(in);
-  }
-
-  public void toData(DataOutput out) throws IOException {
-    DataSerializer.writeInteger(this.dummyID, out);
-  }
-
-  @Override
-  public int hashCode() {
-    int i = this.dummyID.intValue();
-    return i;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o)
-      return true;
-
-    if (!(o instanceof DummyKeyBasedRoutingResolver))
-      return false;
-
-    DummyKeyBasedRoutingResolver otherDummyID = (DummyKeyBasedRoutingResolver) o;
-    return (otherDummyID.dummyID.equals(dummyID));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/893dc86b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDistributedTestSuite.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDistributedTestSuite.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDistributedTestSuite.java
deleted file mode 100644
index 3b829c1..0000000
--- a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXDistributedTestSuite.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.disttx;
-
-import org.junit.runner.RunWith;
-import org.junit.runners.Suite;
-
-@RunWith(Suite.class)
-@Suite.SuiteClasses({
-  CacheMapDistTXDUnitTest.class,
-  DistributedTransactionDUnitTest.class,
-  DistTXDebugDUnitTest.class,
-  DistTXOrderDUnitTest.class,
-  DistTXPersistentDebugDUnitTest.class,
-  DistTXRestrictionsDUnitTest.class,
-  DistTXWithDeltaDUnitTest.class,
-  PersistentPartitionedRegionWithDistTXDUnitTest.class,
-  PRDistTXDUnitTest.class,
-  PRDistTXWithVersionsDUnitTest.class
-})
-
-/**
- * Suite of tests for distributed transactions dunit tests
- * @author shirishd
- */
-public class DistTXDistributedTestSuite {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/893dc86b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java
index 69d6149..097c37c 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/disttx/DistTXPersistentDebugDUnitTest.java
@@ -31,7 +31,7 @@ import com.gemstone.gemfire.test.dunit.Invoke;
 import com.gemstone.gemfire.test.dunit.LogWriterUtils;
 import com.gemstone.gemfire.test.dunit.SerializableCallable;
 
-public class DistTXPersistentDebugDUnitTest extends DistTXDebugDUnitTest {
+public class DistTXPersistentDebugDUnitTest extends DistTXDebugDUnitDisabledTest {
 
   public DistTXPersistentDebugDUnitTest(String name) {
     super(name);


[3/5] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-1050

Posted by kl...@apache.org.
Merge remote-tracking branch 'origin/develop' into feature/GEODE-1050


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7941b839
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7941b839
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7941b839

Branch: refs/heads/feature/GEODE-1050
Commit: 7941b83939b921b7fc5bb31dfbe0f457b9fba781
Parents: f45f0f5 4ed2fd3
Author: Kirk Lund <kl...@apache.org>
Authored: Thu Mar 17 15:58:33 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Mar 17 15:58:33 2016 -0700

----------------------------------------------------------------------
 .../cache/ConnectionPoolAutoDUnitTest.java      |   45 +
 .../gemfire/cache/ConnectionPoolDUnitTest.java  | 5871 ++++++++++++++++++
 2 files changed, 5916 insertions(+)
----------------------------------------------------------------------



[4/5] incubator-geode git commit: Fix headers and change setUp to postSetUp chain

Posted by kl...@apache.org.
Fix headers and change setUp to postSetUp chain


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ad390c99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ad390c99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ad390c99

Branch: refs/heads/feature/GEODE-1050
Commit: ad390c9981141a5f22aa2ba5e3a86f992dfacc48
Parents: 7941b83
Author: Kirk Lund <kl...@apache.org>
Authored: Thu Mar 17 16:01:04 2016 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu Mar 17 16:01:04 2016 -0700

----------------------------------------------------------------------
 .../cache/ConnectionPoolAutoDUnitTest.java      | 28 +++++++++++-------
 .../gemfire/cache/ConnectionPoolDUnitTest.java  | 31 ++++++++++++++------
 2 files changed, 40 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ad390c99/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
index ad110d7..3b43ab8 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolAutoDUnitTest.java
@@ -1,9 +1,18 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package com.gemstone.gemfire.cache;
 
@@ -20,10 +29,9 @@ public class ConnectionPoolAutoDUnitTest extends ConnectionPoolDUnitTest {
   public ConnectionPoolAutoDUnitTest(String name) {
     super(name);
   }
-  
-  public void setUp() throws Exception {
-    super.setUp();
-    // TODO Auto-generated method stub
+
+  @Override
+  protected final void postSetUpConnectionPoolDUnitTest() throws Exception {
     ClientServerTestCase.AUTO_LOAD_BALANCE = true;
     Invoke.invokeInEveryVM(new SerializableRunnable("setupAutoMode") {
       public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ad390c99/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
index 41d48aa..2acab3a 100755
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/ConnectionPoolDUnitTest.java
@@ -1,9 +1,18 @@
-/*=========================================================================
- * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
- * This product is protected by U.S. and international copyright
- * and intellectual property laws. Pivotal products are covered by
- * one or more patents listed at http://www.pivotal.io/patents.
- *=========================================================================
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package com.gemstone.gemfire.cache;
 
@@ -97,8 +106,8 @@ public class ConnectionPoolDUnitTest extends CacheTestCase {
     super(name);
   }
 
-  public void setUp() throws Exception {
-    super.setUp();
+  @Override
+  public final void postSetUp() throws Exception {
     // avoid IllegalStateException from HandShake by connecting all vms to
     // system before creating pool
     getSystem();
@@ -107,10 +116,14 @@ public class ConnectionPoolDUnitTest extends CacheTestCase {
         getSystem();
       }
     });
+    postSetUpConnectionPoolDUnitTest();
+  }
+
+  protected void postSetUpConnectionPoolDUnitTest() throws Exception {
   }
   
   @Override
-  protected final void postTearDownCacheTestCase() throws Exception {
+  public final void postTearDownCacheTestCase() throws Exception {
     Invoke.invokeInEveryVM(new SerializableRunnable() {
       public void run() {
         Map pools = PoolManager.getAll();