You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2016/01/28 19:13:31 UTC
[29/70] [partial] incubator-geode git commit: WAN and CQ code drop
under the Pivotal SGA
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
new file mode 100755
index 0000000..583ef81
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/DurableClientTestCase.java
@@ -0,0 +1,2010 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Ignore;
+
+import junit.framework.Assert;
+import util.TestException;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.PoolFactory;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqExistsException;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.ClientServerObserver;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalCache;
+import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
+import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+/**
+ * Class <code>DurableClientTestCase</code> tests durable client
+ * functionality.
+ *
+ * @author Barry Oglesby
+ *
+ * @since 5.2
+ */
+public class DurableClientTestCase extends DistributedTestCase {
+
+ protected VM server1VM;
+ protected VM server2VM;
+ protected VM durableClientVM;
+ protected VM publisherClientVM;
+ protected String regionName;
+
+ protected static volatile boolean isPrimaryRecovered = false;
+
+ public DurableClientTestCase(String name) {
+ super(name);
+ }
+
+ public void setUp() throws Exception {
+ Host host = Host.getHost(0);
+ this.server1VM = host.getVM(0);
+ this.server2VM = host.getVM(1);
+ this.durableClientVM = host.getVM(2);
+ this.publisherClientVM = host.getVM(3);
+ this.regionName = getName() + "_region";
+ //Clients see this when the servers disconnect
+ addExpectedException("Could not find any server");
+ }
+
+ public void tearDown2() throws Exception {
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+ this.server2VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+ /**
+ * Test that starting a durable client is correctly processed by the server.
+ */
+ public void testSimpleDurableClient() {
+ // Start a server
+ int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+ .intValue();
+
+ // Start a durable client that is not kept alive on the server when it
+ // stops normally
+ final String durableClientId = getName() + "_client";
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId)});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+ //assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_KEEP_ALIVE, proxy.getDurableKeepAlive());
+ }
+ });
+
+ // Stop the durable client
+ this.disconnectDurableClient();
+
+ // Verify the durable client is present on the server for closeCache=false case.
+ this.verifySimpleDurableClient();
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ this.closeDurableClient();
+
+ }
+
+ /**
+ * Test that starting a durable client is correctly processed by the server.
+ * In this test we will set gemfire.SPECIAL_DURABLE property to true and will
+ * see durableID appended by poolname or not
+ */
+ public void testSimpleDurableClient2() {
+ // Start a server
+ int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+ .intValue();
+
+ // Start a durable client that is not kept alive on the server when it
+ // stops normally
+ final String durableClientId = getName() + "_client";
+ final Properties jp = new Properties();
+ jp.setProperty("gemfire.SPECIAL_DURABLE", "true");
+
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true)
+ , regionName
+ , getClientDistributedSystemProperties(durableClientId)
+ , new Boolean(false)
+ , jp});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertNotSame(durableClientId, proxy.getDurableId());
+
+ /* new durable id will be like this
+ * durableClientId
+ * _gem_ //separartor
+ * client pool name
+ */
+ String dId = durableClientId + "_gem_" + "CacheServerTestUtil";
+
+ assertEquals(dId, proxy.getDurableId());
+ assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+ //assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_KEEP_ALIVE, proxy.getDurableKeepAlive());
+ }
+ });
+
+ // Stop the durable client
+ this.disconnectDurableClient();
+
+ // Verify the durable client is present on the server for closeCache=false case.
+ this.verifySimpleDurableClient();
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ this.closeDurableClient();
+
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "unsetJavaSystemProperties",
+ new Object[] {jp});
+
+ }
+
+ public void closeDurableClient()
+ {
+ }
+
+ public void disconnectDurableClient()
+ {
+ this.disconnectDurableClient(false);
+ }
+
+ public void disconnectDurableClient(boolean keepAlive)
+ {
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache",new Object[] {new Boolean(keepAlive)});
+ }
+
+ public void verifySimpleDurableClient()
+ {
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(0);
+ CacheClientProxy proxy = getClientProxy();
+ assertNull(proxy);
+ }
+ });
+ }
+
+ /**
+ * Test that starting, stopping then restarting a durable client is correctly
+ * processed by the server.
+ */
+ public void testStartStopStartDurableClient() {
+ // Start a server
+ int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+ .intValue();
+
+ // Start a durable client that is kept alive on the server when it stops
+ // normally
+ final String durableClientId = getName() + "_client";
+ final int durableClientTimeout = 60; // keep the client alive for 60 seconds
+ //final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+ //assertEquals(durableClientKeepAlive, proxy.getDurableKeepAlive());
+ }
+ });
+
+ // Stop the durable client
+ this.disconnectDurableClient(true);
+
+ // Verify the durable client still exists on the server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+ }
+ });
+
+ // Re-start the durable client
+ this.restartDurableClient(new Object[] {
+ getClientPool(getServerHostName(durableClientVM.getHost()),serverPort, true),
+ regionName,
+ getClientDistributedSystemProperties(durableClientId,
+ durableClientTimeout) });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+ }
+ });
+
+ // Stop the durable client
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+ /**
+ * Test that starting, stopping then restarting a durable client is correctly
+ * processed by the server.
+ * This is a test of bug 39630
+ */
+ public void test39630() {
+ // Start a server
+ int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+ .intValue();
+
+ // Start a durable client that is kept alive on the server when it stops
+ // normally
+ final String durableClientId = getName() + "_client";
+ final int durableClientTimeout = 60; // keep the client alive for 60 seconds
+ //final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)});
+
+// // Send clientReady message
+// this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+// public void run2() throws CacheException {
+// CacheServerTestUtil.getCache().readyForEvents();
+// }
+// });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+ //assertEquals(durableClientKeepAlive, proxy.getDurableKeepAlive());
+ }
+ });
+
+ // Stop the durable client
+ this.disconnectDurableClient(true);
+
+ // Verify the durable client still exists on the server, and the socket is closed
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+ Assert.assertNotNull(proxy._socket);
+ long end = System.currentTimeMillis() + 60000;
+
+ while(!proxy._socket.isClosed()) {
+ if(System.currentTimeMillis() > end) {
+ break;
+ }
+ }
+ Assert.assertTrue(proxy._socket.isClosed());
+ }
+ });
+
+ // Re-start the durable client (this is necessary so the
+ //netDown test will set the appropriate system properties.
+ this.restartDurableClient(new Object[] {
+ getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true),
+ regionName,
+ getClientDistributedSystemProperties(durableClientId,
+ durableClientTimeout) });
+
+ // Stop the durable client
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+ public void restartDurableClient(Object[] args)
+ {
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient", args);
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+ }
+
+ /**
+ * Test that disconnecting a durable client for longer than the timeout
+ * period is correctly processed by the server.
+ */
+ public void testStartStopTimeoutDurableClient() {
+ // Start a server
+ int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+ .intValue();
+
+ // Start a durable client that is kept alive on the server when it stops
+ // normally
+ final String durableClientId = getName() + "_client";
+ final int durableClientTimeout = 5; // keep the client alive for 5 seconds
+ //final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout)});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ assertEquals(durableClientTimeout, proxy.getDurableTimeout());
+ }
+ });
+
+ // Stop the durable client
+ this.disconnectDurableClient(true);
+
+ // Verify the durable client still exists on the server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+ }
+ });
+
+ // Pause to let the client timeout. This time is 2.5 seconds longer than
+ // the durableClientTimeout set above. It should be long enough for the
+ // client to timeout and get cleaned up. There could be a race here,
+ // though.
+ // no need for the explicit pause since checkNumberOfClientProxies
+ // will wait up to 15 seconds
+ //pause(7500);
+
+ // Verify it no longer exists on the server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(0);
+ CacheClientProxy proxy = getClientProxy();
+ assertNull(proxy);
+ }
+ });
+
+ this.restartDurableClient(new Object[] {
+ getClientPool(getServerHostName(Host.getHost(0)), serverPort, true),
+ regionName,
+ getClientDistributedSystemProperties(durableClientId,
+ durableClientTimeout) });
+
+ // Stop the durable client
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+ /**
+ * Test that a durable client correctly receives updates after it reconnects.
+ */
+ public void testDurableClientPrimaryUpdate() {
+ // Start a server
+ int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+ .intValue();
+
+ // Start a durable client that is kept alive on the server when it stops
+ // normally
+ final String durableClientId = getName() + "_client";
+ final int durableClientTimeout = 120; // keep the client alive for 60 seconds
+ //final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Have the durable client register interest in all keys
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ }
+ });
+
+ // Start normal publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+
+ // Publish some entries
+ final int numberOfEntries = 1;
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Verify the durable client received the updates
+ this.verifyListenerUpdates(numberOfEntries);
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ fail("interrupted");
+ }
+
+ // Stop the durable client
+ this.disconnectDurableClient(true);
+
+ //Make sure the proxy is actually paused, not dispatching
+ this.server1VM.invoke(new CacheSerializableRunnable("Wait for paused") {
+ public void run2() throws CacheException {
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ CacheClientProxy proxy = getClientProxy();
+ return proxy != null && proxy.isPaused();
+ }
+ public String description() {
+ return "Proxy was not paused: " + getClientProxy();
+ }
+ };
+ //If we wait too long, the durable queue will be gone, because
+ //the timeout is 120 seconds.
+ DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
+ }
+ });
+
+ // Publish some more entries
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ fail("interrupted");
+ }
+
+ // Verify the durable client's queue contains the entries
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ WaitCriterion wc = new WaitCriterion() {
+ String excuse;
+ public boolean done() {
+ CacheClientProxy proxy = getClientProxy();
+ if (proxy == null) {
+ excuse = "No CacheClientProxy";
+ return false;
+ }
+ // Verify the queue size
+ int sz = proxy.getQueueSize();
+ if (numberOfEntries != sz) {
+ excuse = "expected = " + numberOfEntries + ", actual = " + sz;
+ return false;
+ }
+ return true;
+ }
+ public String description() {
+ return excuse;
+ }
+ };
+ //If we wait too long, the durable queue will be gone, because
+ //the timeout is 120 seconds.
+ DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
+ }
+ });
+
+ // Verify that disconnected client does not receive any events.
+ this.verifyListenerUpdatesDisconnected(numberOfEntries);
+
+ // Re-start the durable client
+ this.restartDurableClient(new Object[] {
+ getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName,
+ getClientDistributedSystemProperties(durableClientId), Boolean.TRUE });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ }
+ });
+
+ // Verify the durable client received the updates held for it on the server
+ this.verifyListenerUpdates(numberOfEntries, numberOfEntries);
+
+ // Stop the publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the durable client VM
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+ /**
+ * Test that a durable client correctly receives updates after it reconnects.
+ */
+ public void testStartStopStartDurableClientUpdate() {
+ // Start a server
+ int serverPort = ((Integer) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+ .intValue();
+
+ // Start a durable client that is kept alive on the server when it stops
+ // normally
+ final String durableClientId = getName() + "_client";
+ final int durableClientTimeout = 60; // keep the client alive for 60 seconds
+ //final boolean durableClientKeepAlive = true; // keep the client alive when it stops normally
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Have the durable client register interest in all keys
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ }
+ });
+
+ // Start normal publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+
+ // Publish some entries
+ final int numberOfEntries = 1;
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Verify the durable client received the updates
+ this.verifyListenerUpdates(numberOfEntries);
+
+ // ARB: Wait for queue ack to arrive at server.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ fail("interrupted");
+ }
+
+ // Stop the durable client
+ this.disconnectDurableClient(true);
+
+ // Verify the durable client still exists on the server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ final CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return proxy.isPaused();
+ }
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 1000, 200, true);
+ assertTrue(proxy.isPaused());
+ }
+ });
+
+ // Publish some more entries
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish more entries") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ fail("interrupted");
+ }
+
+ // Verify the durable client's queue contains the entries
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify the queue size
+ assertEquals(numberOfEntries, proxy.getQueueSize());
+ }
+ });
+
+ // Verify that disconnected client does not receive any events.
+ this.verifyListenerUpdatesDisconnected(numberOfEntries);
+
+ // Re-start the durable client
+ this.restartDurableClient(new Object[] {
+ getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName,
+ getClientDistributedSystemProperties(durableClientId), Boolean.TRUE });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ }
+ });
+
+ // Verify the durable client received the updates held for it on the server
+ this.verifyListenerUpdates(numberOfEntries, numberOfEntries);
+
+ // Stop the publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the durable client VM
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+ public void verifyListenerUpdates(int numEntries)
+ {
+ this.verifyListenerUpdatesEntries(numEntries, 0);
+ }
+
+ public void verifyListenerUpdates(int numEntries, int numEntriesBeforeDisconnect)
+ {
+ this.verifyListenerUpdatesEntries(numEntries, 0);
+ }
+
+ public void verifyListenerUpdatesEntries(final int numEntries, final int numEntriesBeforeDisconnect)
+ {
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Get the listener and wait for the appropriate number of events
+ CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+ .getAttributes().getCacheListeners()[0];
+ listener.waitWhileNotEnoughEvents(60 * 1000, numEntries+numEntriesBeforeDisconnect);
+ assertEquals(numEntries+numEntriesBeforeDisconnect, listener.events.size());
+ //CacheServerTestUtil.getCache().getLogger().info("ARB: verify updates(): numEntries = " + numEntries);
+ //CacheServerTestUtil.getCache().getLogger().info("ARB: verify updates(): listener.events.size() = " + listener.events.size());
+ }
+ });
+ }
+
+ public void verifyListenerUpdatesDisconnected(int numberOfEntries)
+ {
+ // ARB: do nothing.
+ }
+
+ public void verifySimpleDurableClientMultipleServers()
+ {
+ // Verify the durable client is no longer on server1
+ this.server1VM
+ .invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(0);
+ CacheClientProxy proxy = getClientProxy();
+ assertNull(proxy);
+ }
+ });
+
+ // Verify the durable client is no longer on server2
+ this.server2VM
+ .invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(0);
+ CacheClientProxy proxy = getClientProxy();
+ assertNull(proxy);
+ }
+ });
+ }
+
+ /**
+ * Test whether a durable client reconnects properly to a server that is
+ * stopped and restarted.
+ */
+ public void testDurableClientConnectServerStopStart() {
+ // Start a server
+ // Start server 1
+ Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+ final int serverPort = ports[0].intValue();
+
+ // Start a durable client that is not kept alive on the server when it
+ // stops normally
+ final String durableClientId = getName() + "_client";
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(durableClientVM.getHost()), serverPort, true), regionName, getClientDistributedSystemProperties(durableClientId), Boolean.TRUE});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Have the durable client register interest in all keys
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ }
+ });
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+ }
+ });
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Re-start the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "createCacheServer",
+ new Object[] {regionName, new Boolean(true),
+ new Integer(serverPort)});
+
+ // Pause 10 seconds to allow client to reconnect to server
+ // no need for the explicit pause since checkNumberOfClientProxies
+ // will wait up to 15 seconds
+ //pause(10000);
+
+ // Verify durable client on server
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ assertEquals(DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT, proxy.getDurableTimeout());
+ }
+ });
+
+ // Start a publisher
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), serverPort, false), regionName});
+
+ // Publish some messages
+ // Publish some entries
+ final int numberOfEntries = 10;
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Verify the durable client received the updates
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Verify updates") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Get the listener and wait for the appropriate number of events
+ CacheServerTestUtil.ControlListener listener = (CacheServerTestUtil.ControlListener) region
+ .getAttributes().getCacheListeners()[0];
+ listener.waitWhileNotEnoughEvents(30000, numberOfEntries);
+ assertEquals(numberOfEntries, listener.events.size());
+ }
+ });
+
+ // Stop the durable client
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the server
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+ @Ignore("This test is failing inconsistently, see bug 51258")
+ public void DISABLED_testDurableNonHAFailover() throws InterruptedException
+ {
+ durableFailover(0);
+ durableFailoverAfterReconnect(0);
+ }
+
+ @Ignore("This test is failing inconsistently, see bug 51258")
+ public void DISABLED_testDurableHAFailover() throws InterruptedException
+ {
+ //Clients see this when the servers disconnect
+ addExpectedException("Could not find any server");
+ durableFailover(1);
+ durableFailoverAfterReconnect(1);
+ }
+
+ /**
+ * Test a durable client with 2 servers where the client fails over from one to another server
+ * with a publisher/feeder performing operations and the client verifying updates received.
+ * Redundancy level is set to 1 for this test case.
+ */
+ public void durableFailover(int redundancyLevel) throws InterruptedException {
+
+ // Start server 1
+ Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+ final int server1Port = ports[0].intValue();
+
+ // Start server 2 using the same mcast port as server 1
+ final int server2Port = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true)}))
+ .intValue();
+
+ // Stop server 2
+ this.server2VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Start a durable client
+ final String durableClientId = getName() + "_client";
+ final int durableClientTimeout = 60; // keep the client alive for 60 seconds
+ Pool clientPool;
+ if (redundancyLevel == 1) {
+ clientPool = getClientPool(getServerHostName(Host.getHost(0)), server1Port, server2Port, true);
+ }
+ else {
+ clientPool = getClientPool(getServerHostName(Host.getHost(0)), server1Port, server2Port, true, 0);
+ }
+
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {clientPool, regionName,
+ getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Have the durable client register interest in all keys
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ }
+ });
+
+ // Re-start server2
+ this.server2VM.invoke(CacheServerTestUtil.class, "createCacheServer",
+ new Object[] { regionName, new Boolean(true),
+ new Integer(server2Port)});
+
+ // Start normal publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), server1Port, server2Port, false), regionName});
+
+ // Publish some entries
+ final int numberOfEntries = 1;
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Verify the durable client received the updates
+ this.verifyListenerUpdates(numberOfEntries);
+
+ try {
+ java.lang.Thread.sleep(5000);
+ }
+ catch (java.lang.InterruptedException ex) {
+ fail("interrupted");
+ }
+
+
+ // Stop the durable client
+ this.disconnectDurableClient(true);
+
+ // Publish updates during client downtime
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+
+ // Re-start the durable client that is kept alive on the server
+ this.restartDurableClient(new Object[] {
+ clientPool,
+ regionName,
+ getClientDistributedSystemProperties(durableClientId,
+ durableClientTimeout), Boolean.TRUE });
+
+ // Have the durable client register interest in all keys
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ }
+ });
+
+ // Publish second round of updates
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries before failover") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=1; i<numberOfEntries+1; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Verify the durable client received the updates before failover
+ this.verifyListenerUpdates(numberOfEntries+1, numberOfEntries);
+
+ try {
+ java.lang.Thread.sleep(1000);
+ }
+ catch (java.lang.InterruptedException ex) {
+ fail("interrupted");
+ }
+
+ setPrimaryRecoveryCheck();
+
+ // Stop server 1 - publisher will put 10 entries during shutdown/primary identification
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Get") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ assertNull(region.getEntry("0"));
+ }
+ });
+
+ checkPrimaryRecovery();
+ // Publish second round of updates after failover
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries after failover") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=2; i<numberOfEntries+2; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Verify the durable client received the updates after failover
+ this.verifyListenerUpdates(numberOfEntries+2, numberOfEntries);
+
+ // Stop the durable client
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop server 2
+ this.server2VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+
+ public void durableFailoverAfterReconnect(int redundancyLevel)
+ {
+
+
+ // Start server 1
+ Integer[] ports = ((Integer[]) this.server1VM.invoke(CacheServerTestUtil.class,
+ "createCacheServerReturnPorts", new Object[] {regionName, new Boolean(true)}));
+ final int server1Port = ports[0].intValue();
+ final int mcastPort = ports[1].intValue();
+
+ // Start server 2 using the same mcast port as server 1
+ final int server2Port = ((Integer) this.server2VM.invoke(CacheServerTestUtil.class,
+ "createCacheServer", new Object[] {regionName, new Boolean(true), new Integer(mcastPort)}))
+ .intValue();
+
+ // Start a durable client
+ final String durableClientId = getName() + "_client";
+ final int durableClientTimeout = 60; // keep the client alive for 60 seconds
+ Pool clientPool;
+ if (redundancyLevel == 1) {
+ clientPool = getClientPool(getServerHostName(Host.getHost(0)), server1Port, server2Port, true);
+ }
+ else {
+ clientPool = getClientPool(getServerHostName(Host.getHost(0)), server1Port, server2Port, true, 0);
+ }
+
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "disableShufflingOfEndpoints");
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {clientPool, regionName,
+ getClientDistributedSystemProperties(durableClientId, durableClientTimeout), Boolean.TRUE});
+
+ // Send clientReady message
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+
+ // Have the durable client register interest in all keys
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ }
+ });
+
+ // Start normal publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "createCacheClient",
+ new Object[] {getClientPool(getServerHostName(publisherClientVM.getHost()), server1Port, server2Port, false), regionName});
+
+ // Publish some entries
+ final int numberOfEntries = 1;
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ try {
+ java.lang.Thread.sleep(10000);
+ }
+ catch (java.lang.InterruptedException ex) {
+ fail("interrupted");
+ }
+
+ // Verify the durable client received the updates
+ this.verifyListenerUpdates(numberOfEntries);
+
+ this.server1VM.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ }
+ });
+
+ // Stop the durable client
+ this.disconnectDurableClient(true);
+
+ // Stop server 1 - publisher will put 10 entries during shutdown/primary identification
+ this.server1VM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Publish updates during client downtime
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=0; i<numberOfEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Re-start the durable client that is kept alive on the server
+ this.restartDurableClient(new Object[] {
+ clientPool,
+ regionName,
+ getClientDistributedSystemProperties(durableClientId,
+ durableClientTimeout), Boolean.TRUE });
+
+ // Have the durable client register interest in all keys
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Register interest") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ }
+ });
+
+ // Publish second round of updates
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries before failover") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=1; i<numberOfEntries+1; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Verify the durable client received the updates before failover
+ if (redundancyLevel == 1) {
+ this.verifyListenerUpdates(numberOfEntries+1, numberOfEntries);
+ }
+ else {
+ this.verifyListenerUpdates(numberOfEntries, numberOfEntries);
+ }
+
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Get") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ assertNull(region.getEntry("0"));
+ }
+ });
+
+ // Publish second round of updates after failover
+ this.publisherClientVM.invoke(new CacheSerializableRunnable("Publish entries after failover") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i=2; i<numberOfEntries+2; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, keyAndValue);
+ }
+ }
+ });
+
+ // Verify the durable client received the updates after failover
+ if (redundancyLevel == 1) {
+ this.verifyListenerUpdates(numberOfEntries+2, numberOfEntries);
+ }
+ else {
+ this.verifyListenerUpdates(numberOfEntries+1, numberOfEntries);
+ }
+
+ // Stop the durable client
+ this.durableClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop the publisher client
+ this.publisherClientVM.invoke(CacheServerTestUtil.class, "closeCache");
+
+ // Stop server 2
+ this.server2VM.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+
+
+ //First we will have the client wait before trying to reconnect
+ //Then the drain will lock and begins to drain
+ //The client will then be able to continue, and get rejected
+ //Then we proceed to drain and release all locks
+ //The client will then reconnect
+ public class RejectClientReconnectTestHook implements CacheClientProxy.TestHook {
+ CountDownLatch reconnectLatch = new CountDownLatch(1);
+ CountDownLatch continueDrain = new CountDownLatch(1);
+ boolean clientWasRejected = false;
+ CountDownLatch clientConnected = new CountDownLatch(1);
+
+ public void doTestHook(String spot) {
+ System.out.println("JASON " + spot);
+ try {
+ if (spot.equals("CLIENT_PRE_RECONNECT")) {
+ if (!reconnectLatch.await(60, TimeUnit.SECONDS)) {
+ throw new TestException("reonnect latch was never released.");
+ }
+ }
+ else if (spot.equals("DRAIN_IN_PROGRESS_BEFORE_DRAIN_LOCK_CHECK")) {
+ //let client try to reconnect
+ reconnectLatch.countDown();
+ //we wait until the client is rejected
+ if (!continueDrain.await(120, TimeUnit.SECONDS)) {
+ throw new TestException("Latch was never released.");
+ }
+ }
+ else if (spot.equals("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED")) {
+ clientWasRejected = true;
+ continueDrain.countDown();
+ }
+ else if (spot.equals("DRAIN_COMPLETE")) {
+
+ }
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public boolean wasClientRejected() {
+ return clientWasRejected;
+ }
+ }
+
+ /*
+ * This hook will cause the close cq to throw an exception due to a client in the middle of activating
+ * sequence -
+ * server will pause before draining
+ * client will begin to reconnect and then wait to continue
+ * server will be unblocked, and rejected
+ * client will the be unlocked after server is rejected and continue
+ */
+ public class CqExceptionDueToActivatingClientTestHook implements CacheClientProxy.TestHook {
+ CountDownLatch unblockDrain = new CountDownLatch(1);
+ CountDownLatch unblockClient = new CountDownLatch(1);
+ CountDownLatch finish = new CountDownLatch(1);
+ public void doTestHook(String spot) {
+ if (spot.equals("PRE_DRAIN_IN_PROGRESS")) {
+ try {
+ //Unblock any client waiting to reconnect
+ unblockClient.countDown();
+ //Wait until client is reconnecting
+ if (!unblockDrain.await(120, TimeUnit.SECONDS)) {
+ throw new TestException("client never got far enough reconnected to unlatch lock.");
+ }
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (spot.equals("PRE_RELEASE_DRAIN_LOCK")) {
+ //Client is reconnecting but still holds the drain lock
+ //let the test continue to try to close a cq
+ unblockDrain.countDown();
+ //wait until the server has finished attempting to close the cq
+ try {
+ if (!finish.await(30, TimeUnit.SECONDS) ) {
+ throw new TestException("Test did not complete, server never finished attempting to close cq");
+ }
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (spot.equals("DRAIN_COMPLETE")) {
+ finish.countDown();
+ }
+ }
+ }
+
+
+ protected CqQuery createCq(String cqName, String cqQuery, boolean durable) throws CqException, CqExistsException {
+ QueryService qs = CacheServerTestUtil.getCache().getQueryService();
+ CqAttributesFactory cqf = new CqAttributesFactory();
+ CqListener[] cqListeners = { new CacheServerTestUtil.ControlCqListener() };
+ cqf.initCqListeners(cqListeners);
+ CqAttributes cqa = cqf.create();
+ return qs.newCq(cqName, cqQuery, cqa, durable);
+
+ }
+
+ protected Pool getClientPool(String host, int serverPort, boolean establishCallbackConnection){
+ PoolFactory pf = PoolManager.createFactory();
+ pf.addServer(host, serverPort)
+ .setSubscriptionEnabled(establishCallbackConnection)
+ .setSubscriptionAckInterval(1);
+ return ((PoolFactoryImpl)pf).getPoolAttributes();
+ }
+
+
+ protected Pool getClientPool(String host, int server1Port, int server2Port, boolean establishCallbackConnection){
+ return getClientPool(host, server1Port, server2Port, establishCallbackConnection, 1);
+ }
+
+
+ protected Properties getClientDistributedSystemProperties(String durableClientId) {
+ return getClientDistributedSystemProperties(durableClientId,
+ DistributionConfig.DEFAULT_DURABLE_CLIENT_TIMEOUT);
+ }
+
+ protected Properties getClientDistributedSystemPropertiesNonDurable(String durableClientId) {
+ Properties properties = new Properties();
+ properties.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ properties.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ return properties;
+ }
+
+
+ protected Properties getClientDistributedSystemProperties(
+ String durableClientId, int durableClientTimeout) {
+ Properties properties = new Properties();
+ properties.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ properties.setProperty(DistributionConfig.LOCATORS_NAME, "");
+ properties.setProperty(DistributionConfig.DURABLE_CLIENT_ID_NAME, durableClientId);
+ properties.setProperty(DistributionConfig.DURABLE_CLIENT_TIMEOUT_NAME, String.valueOf(durableClientTimeout));
+ return properties;
+ }
+
+ protected static CacheClientProxy getClientProxy() {
+ // Get the CacheClientNotifier
+ CacheClientNotifier notifier = getBridgeServer().getAcceptor()
+ .getCacheClientNotifier();
+
+ // Get the CacheClientProxy or not (if proxy set is empty)
+ CacheClientProxy proxy = null;
+ Iterator i = notifier.getClientProxies().iterator();
+ if (i.hasNext()) {
+ proxy = (CacheClientProxy) i.next();
+ }
+ return proxy;
+ }
+
+ protected static void checkNumberOfClientProxies(final int expected) {
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return expected == getNumberOfClientProxies();
+ }
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+ }
+
+ protected static void checkProxyIsAlive(final CacheClientProxy proxy) {
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return proxy.isAlive();
+ }
+ public String description() {
+ return null;
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 15 * 1000, 200, true);
+ }
+
+ protected static int getNumberOfClientProxies() {
+ return getBridgeServer().getAcceptor().getCacheClientNotifier()
+ .getClientProxies().size();
+ }
+
+ protected static CacheServerImpl getBridgeServer() {
+ CacheServerImpl bridgeServer = (CacheServerImpl) CacheServerTestUtil
+ .getCache().getCacheServers().iterator().next();
+ assertNotNull(bridgeServer);
+ return bridgeServer;
+ }
+
+
+ protected Pool getClientPool(String host, int server1Port, int server2Port, boolean establishCallbackConnection, int redundancyLevel){
+ PoolFactory pf = PoolManager.createFactory();
+ pf.addServer(host, server1Port)
+ .addServer(host, server2Port)
+ .setSubscriptionEnabled(establishCallbackConnection)
+ .setSubscriptionRedundancy(redundancyLevel)
+ .setSubscriptionAckInterval(1);
+ return ((PoolFactoryImpl)pf).getPoolAttributes();
+ }
+
+ /**
+ * Returns the durable client proxy's HARegionQueue region name. This method
+ * is accessed via reflection on a server VM.
+ * @return the durable client proxy's HARegionQueue region name
+ */
+ protected static String getHARegionQueueName() {
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+ return proxy.getHARegionName();
+ }
+
+ public static void verifyReceivedMarkerAck(final CacheClientProxy proxy) {
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ GemFireCacheImpl.getInstance().getLoggerI18n().fine(
+ "DurableClientDUnitTest->WaitCriterion :: done called");
+ return checkForAck(proxy);
+ }
+ public String description() {
+ return "never received marker ack";
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 3 * 60 * 1000, 200/*0*/, true);
+ }
+
+ /**
+ * This is an incredibly ugly test to see if we got an ack from the client.
+ * the dispatched messages map is <b> static<b> map, which has
+ * region queue names for keys and MapWrappers (an protected class)
+ * as values. All this is testing is to see that this queue has an entry in the dispatchedMessages
+ * map, which means it got at least one periodic ack.
+ * @return true if there was an ack
+ */
+ protected static boolean checkForAck(CacheClientProxy proxy) {
+ //pause(5000);
+ return HARegionQueue.isTestMarkerMessageRecieved();
+ }
+
+ protected static void setTestFlagToVerifyActForMarker(Boolean flag){
+ HARegionQueue.setUsedByTest(flag.booleanValue());
+ }
+
+ public static void setBridgeObeserverForAfterPrimaryRecovered() {
+ DurableClientTestCase.isPrimaryRecovered = false;
+ PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG = true;
+ ClientServerObserver bo = ClientServerObserverHolder
+ .setInstance(new ClientServerObserverAdapter() {
+ public void afterPrimaryRecovered(ServerLocation location) {
+ DurableClientTestCase.isPrimaryRecovered = true;
+ PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG = false;
+ }
+ });
+ }
+
+ public void setPrimaryRecoveryCheck() {
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Set observer") {
+ public void run2() {
+ setBridgeObeserverForAfterPrimaryRecovered();
+ }
+ });
+ }
+
+ public void checkPrimaryRecovery() {
+ this.durableClientVM.invoke(new CacheSerializableRunnable("Check observer") {
+ public void run2() {
+ WaitCriterion waitForPrimaryRecovery = new WaitCriterion() {
+ public boolean done() {
+ return DurableClientTestCase.isPrimaryRecovered;
+ }
+
+ public String description() {
+ return "Did not detect primary recovery event during wait period";
+ }
+ };
+
+ // wait for primary (and interest) recovery
+ // recovery satisfier task currently uses ping interval value
+ DistributedTestCase.waitForCriterion(waitForPrimaryRecovery, 30000, 1000, true);
+ }
+ });
+ }
+
+
+
+ protected void sendClientReady(VM vm) {
+ // Send clientReady message
+ vm.invoke(new CacheSerializableRunnable(
+ "Send clientReady") {
+ public void run2() throws CacheException {
+ CacheServerTestUtil.getCache().readyForEvents();
+ }
+ });
+ }
+
+ protected void registerInterest(VM vm, final String regionName, final boolean durable) {
+ vm.invoke(new CacheSerializableRunnable("Register interest on region : " + regionName) {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Register interest in all keys
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, durable);
+ }
+ });
+ }
+
+ protected void createCq(VM vm, final String cqName, final String cqQuery, final boolean durable) {
+ vm.invoke(new CacheSerializableRunnable(
+ "Register cq " + cqName) {
+ public void run2() throws CacheException {
+
+ try {
+ createCq(cqName, cqQuery, durable).execute();
+ }
+ catch (CqExistsException e) {
+ throw new CacheException(e) {};
+ }
+ catch (CqException e) {
+ throw new CacheException(e) {};
+ }
+ catch (RegionNotFoundException e) {
+ throw new CacheException(e) {};
+ }
+
+ }
+ });
+ }
+
+ protected void publishEntries(VM vm, final String regionName, final int numEntries) {
+ vm.invoke(new CacheSerializableRunnable(
+ "publish " + numEntries + " entries") {
+ public void run2() throws CacheException {
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+
+ // Publish some entries
+ for (int i = 0; i < numEntries; i++) {
+ String keyAndValue = String.valueOf(i);
+ region.put(keyAndValue, new Portfolio(i));
+ }
+ }
+ });
+ }
+
+ /*
+ * Due to the way removal from ha region queue is implemented
+ * a dummy cq or interest needs to be created and a dummy value used so that
+ * none of the actual cqs will be triggered and yet an event will
+ * flush the queue
+ */
+ protected void flushEntries(VM server, VM client, final String regionName) {
+ //This wait is to make sure that all acks have been responded to...
+ //We can add a stat later on the cache client proxy stats that checks
+ //ack counts
+ try {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ registerInterest(client, regionName, false);
+ server.invoke(new CacheSerializableRunnable("flush entries") {
+ public void run2() throws CacheException {
+ CqService service = ((InternalCache) CacheServerTestUtil.getCache()).getCqService();
+ // Get the region
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+ assertNotNull(region);
+ region.put("LAST", "ENTRY");
+ }
+ });
+ }
+
+
+ protected void checkCqStatOnServer(VM server, final String durableClientId, final String cqName, final int expectedNumber) {
+ server.invoke(new CacheSerializableRunnable(
+ "Check ha queued cq stats for durable client " + durableClientId + " cq: " + cqName) {
+ public void run2() throws CacheException {
+
+ final CacheClientNotifier ccnInstance = CacheClientNotifier
+ .getInstance();
+ final CacheClientProxy clientProxy = ccnInstance
+ .getClientProxy(durableClientId);
+ ClientProxyMembershipID proxyId = clientProxy.getProxyID();
+ CqService cqService = ((InternalCache)CacheServerTestUtil.getCache()).getCqService();
+ cqService.start();
+ final CqQueryImpl cqQuery = (CqQueryImpl)cqService.getClientCqFromServer(proxyId, cqName);
+
+ //Wait until we get the expected number of events or until 10 seconds are up
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return cqQuery.getVsdStats().getNumHAQueuedEvents() == expectedNumber;
+ }
+ public String description() {
+ return "cq numHAQueuedEvents stat was expected to be " + expectedNumber + " but was instead " + cqQuery.getVsdStats().getNumHAQueuedEvents();
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
+
+ assertEquals(expectedNumber, cqQuery.getVsdStats().getNumHAQueuedEvents());
+ }
+ });
+ }
+
+ /*
+ * Remaining is the number of events that could still be in the queue due to timing issues with acks
+ * and receiving them after remove from ha queue region has been called.
+ */
+ protected void checkHAQueueSize(VM server, final String durableClientId, final int expectedNumber, final int remaining) {
+ server.invoke(new CacheSerializableRunnable(
+ "Check ha queued size for durable client " + durableClientId) {
+ public void run2() throws CacheException {
+
+ final CacheClientNotifier ccnInstance = CacheClientNotifier
+ .getInstance();
+ final CacheClientProxy clientProxy = ccnInstance
+ .getClientProxy(durableClientId);
+ ClientProxyMembershipID proxyId = clientProxy.getProxyID();
+
+ //Wait until we get the expected number of events or until 10 seconds are up
+ WaitCriterion ev = new WaitCriterion() {
+ public boolean done() {
+ return clientProxy.getQueueSizeStat() == expectedNumber || clientProxy.getQueueSizeStat() == remaining;
+ }
+ public String description() {
+ return "queue size stat was expected to be " + expectedNumber + " but was instead " + clientProxy.getQueueSizeStat();
+ }
+ };
+ DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
+
+ assertTrue(clientProxy.getQueueSizeStat() == expectedNumber || clientProxy.getQueueSizeStat() == remaining);
+ }
+ });
+ }
+
+ protected void checkNumDurableCqs(VM server, final String durableClientId, final int expectedNumber) {
+ server.invoke(new CacheSerializableRunnable(
+ "check number of durable cqs on server for durable client: " + durableClientId){
+ public void run2() throws CacheException {
+ try {
+ final CacheClientNotifier ccnInstance = CacheClientNotifier
+ .getInstance();
+ final CacheClientProxy clientProxy = ccnInstance
+ .getClientProxy(durableClientId);
+ ClientProxyMembershipID proxyId = clientProxy.getProxyID();
+ CqService cqService = ((InternalCache)CacheServerTestUtil.getCache()).getCqService();
+ cqService.start();
+ List<String> cqNames = cqService.getAllDurableClientCqs(proxyId);
+ assertEquals(expectedNumber, cqNames.size());
+ }
+ catch (Exception e) {
+ throw new CacheException(e){};
+ }
+ }
+ });
+ }
+
+ /*
+ * @param vm
+ * @param cqName
+ * @param numEvents
+ * @param numEventsToWaitFor most times will be the same as numEvents,
+ but there are times where we want to wait for an event we know is
+ not coming just to be sure an event actually isnt received
+ * @param secondsToWait
+ */
+ protected void checkCqListenerEvents(VM vm, final String cqName, final int numEvents, final int numEventsToWaitFor, final int secondsToWait) {
+ vm.invoke(new CacheSerializableRunnable("Verify events for cq: " + cqName) {
+ public void run2() throws CacheException {
+ QueryService qs = CacheServerTestUtil.getCache().getQueryService();
+ CqQuery cq = qs.getCq(cqName);
+ // Get the listener and wait for the appropriate number of events
+ CacheServerTestUtil.ControlCqListener listener = (CacheServerTestUtil.ControlCqListener) cq.getCqAttributes().getCqListener();
+ listener.waitWhileNotEnoughEvents(secondsToWait * 1000, numEventsToWaitFor);
+ assertEquals(numEvents, listener.events.size());
+ }
+ });
+ }
+
+ protected void checkInterestEvents(VM vm, final String regionName, final int numEvents) {
+ vm.invoke(new CacheSerializableRunnable("Verify interest events") {
+ public void run2() throws CacheException {
+ Region region = CacheServerTestUtil.getCache().getRegion(regionName);
+
+ CacheServerTestUtil.ControlListener clistener = (CacheServerTestUtil.ControlListener) region
+ .getAttributes().getCacheListeners()[0];
+ clistener.waitWhileNotEnoughEvents(30000, numEvents);
+ assertEquals(numEvents, clistener.events.size());
+ }
+ });
+ }
+
+ protected void startDurableClient(VM vm, String durableClientId, int serverPort1, String regionName, int durableTimeoutInSeconds) {
+ vm.invoke(
+ CacheServerTestUtil.class,
+ "createCacheClient",
+ new Object[] {
+ getClientPool(getServerHostName(durableClientVM.getHost()),
+ serverPort1, true), regionName,
+ getClientDistributedSystemProperties(durableClientId, durableTimeoutInSeconds),
+ Boolean.TRUE });
+ }
+
+ protected void startDurableClient(VM vm, String durableClientId, int serverPort1, String regionName) {
+ vm.invoke(
+ CacheServerTestUtil.class,
+ "createCacheClient",
+ new Object[] {
+ getClientPool(getServerHostName(durableClientVM.getHost()),
+ serverPort1, true), regionName,
+ getClientDistributedSystemProperties(durableClientId),
+ Boolean.TRUE });
+ }
+
+ protected void startDurableClient(VM vm, String durableClientId, int serverPort1, int serverPort2, String regionName) {
+ vm.invoke(
+ CacheServerTestUtil.class,
+ "createCacheClient",
+ new Object[] {
+ getClientPool(getServerHostName(vm.getHost()),
+ serverPort1, serverPort2, true), regionName,
+ getClientDistributedSystemProperties(durableClientId),
+ Boolean.TRUE });
+ }
+
+ protected void startClient(VM vm, int serverPort1, String regionName) {
+ vm.invoke(
+ CacheServerTestUtil.class,
+ "createCacheClient",
+ new Object[] {
+ getClientPool(getServerHostName(vm.getHost()),
+ serverPort1, false), regionName });
+ }
+
+ protected void startClient(VM vm, int serverPort1, int serverPort2, String regionName) {
+ vm.invoke(
+ CacheServerTestUtil.class,
+ "createCacheClient",
+ new Object[] {
+ getClientPool(getServerHostName(vm.getHost()),
+ serverPort1, serverPort2, false), regionName });
+ }
+
+ protected void verifyDurableClientOnServer(VM server, final String durableClientId) {
+ server.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ // Find the proxy
+ checkNumberOfClientProxies(1);
+ CacheClientProxy proxy = getClientProxy();
+ assertNotNull(proxy);
+
+ // Verify that it is durable and its properties are correct
+ assertTrue(proxy.isDurable());
+ assertEquals(durableClientId, proxy.getDurableId());
+ }
+ });
+ }
+
+ protected void checkPrimaryUpdater(VM vm) {
+ vm.invoke(new CacheSerializableRunnable("Verify durable client") {
+ public void run2() throws CacheException {
+ WaitCriterion wc = new WaitCriterion() {
+ public boolean done() {
+ return CacheServerTestUtil.getPool().isPrimaryUpdaterAlive();
+ }
+ public String description() {
+ return "No primary updater";
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true);
+ assertTrue(CacheServerTestUtil.getPool().isPrimaryUpdaterAlive());
+ }
+ });
+ }
+
+ protected void closeCache(VM vm) {
+ vm.invoke(CacheServerTestUtil.class, "closeCache");
+ }
+}