You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/31 20:55:52 UTC
[geode] 02/02: GEODE-3637: Final commit with test to confirm
asynchronous client queue creation
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 6e4e70282a34ce5299d6968994d1be34ded4955a
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Tue Oct 31 13:55:42 2017 -0700
GEODE-3637: Final commit with test to confirm asynchronous client queue creation
---
.../internal/cache/tier/sockets/AcceptorImpl.java | 105 ++++++-------
.../cache/tier/sockets/ServerConnection.java | 12 --
.../sockets/AcceptorImplClientQueueDUnitTest.java} | 168 ++++++++++-----------
.../apache/geode/test/dunit/rules/CacheRule.java | 22 ++-
4 files changed, 154 insertions(+), 153 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 721874f..193b300 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -95,6 +95,7 @@ import org.apache.geode.internal.util.ArrayUtils;
/**
* Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts
* up threads to process requests from these.
+ *
* @since GemFire 2.0.2
*/
@SuppressWarnings("deprecation")
@@ -102,6 +103,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
private static final Logger logger = LogService.getLogger();
private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
+ private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4;
protected final CacheServerStats stats;
private final int maxConnections;
@@ -272,6 +274,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* The ip address or host name this acceptor is to bind to; <code>null</code> or "" indicates it
* will listen on all local addresses.
+ *
* @since GemFire 5.7
*/
private final String bindHostName;
@@ -311,13 +314,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* Initializes this acceptor thread to listen for connections on the given port.
+ *
* @param port The port on which this acceptor listens for connections. If <code>0</code>, a
- * random port will be chosen.
+ * random port will be chosen.
* @param bindHostName The ip address or host name this acceptor listens on for connections. If
- * <code>null</code> or "" then all local addresses are used
+ * <code>null</code> or "" then all local addresses are used
* @param socketBufferSize The buffer size for server-side sockets
* @param maximumTimeBetweenPings The maximum time between client pings. This value is used by the
- * <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
+ * <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
* @param internalCache The GemFire cache whose contents is served to clients
* @param maxConnections the maximum number of connections allowed in the server pool
* @param maxThreads the maximum number of threads allowed in the server pool
@@ -326,14 +330,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
* @since GemFire 5.7
*/
public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription,
- int socketBufferSize, int maximumTimeBetweenPings,
- InternalCache internalCache,
- int maxConnections, int maxThreads, int maximumMessageCount,
- int messageTimeToLive,
- ConnectionListener listener, List overflowAttributesList,
- boolean isGatewayReceiver,
- List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
- ServerConnectionFactory serverConnectionFactory) throws IOException {
+ int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
+ int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
+ ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
+ List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+ ServerConnectionFactory serverConnectionFactory) throws IOException {
this.securityService = internalCache.getSecurityService();
this.bindHostName = calcBindHostName(internalCache, bindHostName);
this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
@@ -445,7 +446,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
// fix for bug 36617. If BindException is thrown, retry after
// sleeping. The server may have been stopped and then
// immediately restarted, which sometimes results in a bind exception
- for (; ; ) {
+ for (;;) {
try {
this.serverSock.bind(new InetSocketAddress(getBindAddress(), port), backLog);
break;
@@ -474,7 +475,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
// fix for bug 36617. If BindException is thrown, retry after
// sleeping. The server may have been stopped and then
// immediately restarted, which sometimes results in a bind exception
- for (; ; ) {
+ for (;;) {
try {
this.serverSock = this.socketCreator.createServerSocket(port, backLog, getBindAddress(),
this.gatewayTransportFilters, socketBufferSize);
@@ -519,7 +520,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
String sockName = getServerName();
logger.info(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_CACHE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1,
- new Object[]{sockName, Integer.valueOf(backLog)}));
+ new Object[] {sockName, Integer.valueOf(backLog)}));
if (isGatewayReceiver) {
this.stats = GatewayReceiverStats.createGatewayReceiverStats(sockName);
} else {
@@ -581,6 +582,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
}
};
+ logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE);
return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
socketThreadFactory, rejectedExecutionHandler);
} catch (IllegalArgumentException poolInitException) {
@@ -592,16 +594,15 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
- final ThreadGroup clientQueueThreadGroup = LoggingThreadGroup.createThreadGroup(
- "Client Queue Initialization ", logger);
+ final ThreadGroup clientQueueThreadGroup =
+ LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger);
ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
AtomicInteger connNum = new AtomicInteger(-1);
@Override
public Thread newThread(final Runnable command) {
- String
- threadName =
+ String threadName =
clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
Runnable runnable = new Runnable() {
public void run() {
@@ -616,8 +617,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
};
try {
- return new ThreadPoolExecutor(1, 16, 60,
- TimeUnit.SECONDS, new SynchronousQueue(), clientQueueThreadFactory);
+ return new ThreadPoolExecutor(1, 16, 60, TimeUnit.SECONDS, new SynchronousQueue(),
+ clientQueueThreadFactory);
} catch (IllegalArgumentException poolInitException) {
throw poolInitException;
}
@@ -683,6 +684,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* This system property is only used if max-threads == 0. This is for 5.0.2 backwards
* compatibility.
+ *
* @deprecated since 5.1 use cache-server max-threads instead
*/
@Deprecated
@@ -691,13 +693,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* This system property is only used if max-threads == 0. This is for 5.0.2 backwards
* compatibility.
+ *
* @deprecated since 5.1 use cache-server max-threads instead
*/
@Deprecated
private final static int DEPRECATED_SELECTOR_POOL_SIZE =
Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
- private final static int HANDSHAKE_POOL_SIZE =
- Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue();
+ private final static int HANDSHAKE_POOL_SIZE = Integer
+ .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
@Override
public void start() throws IOException {
@@ -828,6 +831,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* Ensure that the CachedRegionHelper and ServerConnection classes get loaded.
+ *
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
@@ -1189,6 +1193,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* The work loop of this acceptor
+ *
* @see #accept
*/
public void run() {
@@ -1229,8 +1234,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
/**
- * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new {@link
- * ServerConnection}to handle messages from that client.
+ * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new
+ * {@link ServerConnection}to handle messages from that client.
*/
@Override
public void accept() {
@@ -1318,7 +1323,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
* blocking is good because it will throttle the rate at which we create new connections.
*/
private void handOffNewClientConnection(final Socket socket,
- final ServerConnectionFactory serverConnectionFactory) {
+ final ServerConnectionFactory serverConnectionFactory) {
try {
this.stats.incAcceptsInProgress();
this.hsPool.execute(new Runnable() {
@@ -1403,8 +1408,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
protected void handleNewClientConnection(final Socket socket,
- final ServerConnectionFactory serverConnectionFactory)
- throws IOException {
+ final ServerConnectionFactory serverConnectionFactory) throws IOException {
// Read the first byte. If this socket is being used for 'client to server'
// communication, create a ServerConnection. If this socket is being used
// for 'server to client' communication, send it to the CacheClientNotifier
@@ -1425,24 +1429,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
throw new EOFException();
}
- // GEODE-3637
- if (communicationMode.isSubscriptionFeed()) {
- boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
- logger.info(
- ":Bridge server: Initializing {} server-to-client communication socket: {} using selector={}",
- primary ? "primary" : "secondary", socket, isSelector());
- AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
- this.notifyBySubscription);
+ // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue
+ // initialization to be done in another threadPool
+ if (initializeClientPools(socket, communicationMode)) {
return;
}
-// if (communicationMode.isSubscriptionFeed()) {
-// boolean
-// isPrimaryServerToClient =
-// communicationMode == CommunicationMode.PrimaryServerToClient;
-// clientQueueInitPool
-// .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
-// return;
-// }
logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
socket);
@@ -1452,7 +1443,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
if (curCnt >= this.maxConnections) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
- new Object[]{socket.getInetAddress(), Integer.valueOf(curCnt),
+ new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
Integer.valueOf(this.maxConnections)}));
if (communicationMode.expectsConnectionRefusalMessage()) {
try {
@@ -1492,7 +1483,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
logger.warn(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL,
- new Object[]{serverConn}));
+ new Object[] {serverConn}));
try {
ServerHandShakeProcessor.refuse(socket.getOutputStream(),
LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
@@ -1506,6 +1497,17 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
}
+ private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) {
+ if (communicationMode.isSubscriptionFeed()) {
+ boolean isPrimaryServerToClient =
+ communicationMode == CommunicationMode.PrimaryServerToClient;
+ clientQueueInitPool
+ .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
+ return true;
+ }
+ return false;
+ }
+
private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
socket.setSoTimeout(this.acceptTimeout);
this.socketCreator.configureServerSSLSocket(socket);
@@ -1667,9 +1669,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* @param bindName the ip address or host name that this acceptor should bind to. If null or ""
- * then calculate it.
+ * then calculate it.
* @return the ip address or host name this acceptor will listen on. An "" if all local addresses
- * will be listened to.
+ * will be listened to.
* @since GemFire 5.7
*/
private static String calcBindHostName(Cache cache, String bindName) {
@@ -1706,6 +1708,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* Gets the address that this bridge server can be contacted on from external processes.
+ *
* @since GemFire 5.7
*/
public String getExternalAddress() {
@@ -1739,6 +1742,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* This method finds a client notifier and returns it. It is used to propagate interest
* registrations to other servers
+ *
* @return the instance that provides client notification
*/
@Override
@@ -1826,7 +1830,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
private final AcceptorImpl acceptor;
public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
- AcceptorImpl acceptor) {
+ AcceptorImpl acceptor) {
this.socket = socket;
this.acceptor = acceptor;
this.isPrimaryServerToClient = isPrimaryServerToClient;
@@ -1837,9 +1841,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
isPrimaryServerToClient ? "primary" : "secondary", socket);
try {
- acceptor.getCacheClientNotifier()
- .registerClient(socket, isPrimaryServerToClient, acceptor.getAcceptorId(),
- acceptor.isNotifyBySubscription());
+ acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient,
+ acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
} catch (IOException ex) {
closeSocket(socket);
if (isRunning()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 1cac128..b141c6c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1134,8 +1134,6 @@ public abstract class ServerConnection implements Runnable {
if (getAcceptor().isSelector()) {
boolean finishedMsg = false;
try {
- initializeClientNofication();
-
this.stats.decThreadQueueSize();
if (!isTerminated()) {
getAcceptor().setTLCommBuffer();
@@ -1190,16 +1188,6 @@ public abstract class ServerConnection implements Runnable {
}
}
- private void initializeClientNofication() throws IOException {
-// if (communicationMode.isSubscriptionFeed()) {
-// boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-// logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-// primary ? "primary" : "secondary", theSocket);
-// getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
-// getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
-// }
- }
-
/**
* If registered with a selector then this will be the key we are registered with.
*/
diff --git a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
similarity index 69%
rename from geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
rename to geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
index 9da6ca9..f5a2e63 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
@@ -12,19 +12,22 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package org.apache.geode.internal;
+package org.apache.geode.internal.cache.tier.sockets;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.rmi.RemoteException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
@@ -43,6 +46,8 @@ import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.server.ClientSubscriptionConfig;
import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.distributed.DistributedLockBlackboard;
+import org.apache.geode.distributed.DistributedLockBlackboardImpl;
import org.apache.geode.internal.cache.DiskStoreAttributes;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.InternalCache;
@@ -58,17 +63,19 @@ import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolde
import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
@Category(DistributedTest.class)
-public class AcceptorImplDUnitTest implements Serializable {
+public class AcceptorImplClientQueueDUnitTest implements Serializable {
private final Host host = Host.getHost(0);
- private final static int numberOfEntries = 1000;
+ private final static int numberOfEntries = 200;
private final static AtomicInteger eventCount = new AtomicInteger(0);
+ private final static AtomicBoolean completedClient2 = new AtomicBoolean(false);
@ClassRule
public static DistributedTestRule distributedTestRule = new DistributedTestRule();
@Rule
- public CacheRule cacheRule = CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(
- host.getVM(1)).build();
+ public CacheRule cacheRule =
+ CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1))
+ .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build();
@Rule
public SerializableTestName name = new SerializableTestName();
@@ -77,30 +84,23 @@ public class AcceptorImplDUnitTest implements Serializable {
public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
@Rule
- public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
+ public DistributedRestoreSystemProperties restoreSystemProperties =
+ new DistributedRestoreSystemProperties();
- @Rule
- public SharedCountersRule sharedCountersRule = SharedCountersRule.builder().build();
+ private DistributedLockBlackboard blackboard = null;
@Before
- public void setup() {
- Host host = this.host;
- sharedCountersRule.initialize("startNow");
- host.getAllVMs().forEach((vm) ->
- vm.invoke(() -> {
- System.setProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1");
- })
- );
+ public void setup() throws Exception {
+ blackboard = DistributedLockBlackboardImpl.getInstance();
}
@After
- public void tearDown() {
-
- host.getAllVMs().forEach((vm) ->
- vm.invoke(() -> {
- InitialImageOperation.slowImageProcessing = 0;
- })
- );
+ public void tearDown() throws RemoteException {
+ blackboard.initCount();
+ host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
+ InitialImageOperation.slowImageProcessing = 0;
+ System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
+ }));
}
@Test
@@ -123,10 +123,8 @@ public class AcceptorImplDUnitTest implements Serializable {
clientCacheFactory.setPoolSubscriptionRedundancy(1);
clientCacheFactory.setPoolReadTimeout(200);
clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
- ClientCache
- cache =
- clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
- .set("mcast-port", "0").create();
+ ClientCache cache = clientCacheFactory.set("durable-client-id", "1")
+ .set("durable-client-timeout", "300").set("mcast-port", "0").create();
ClientRegionFactory<Object, Object> clientRegionFactory =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
Region region = clientRegionFactory.create("subscriptionRegion");
@@ -163,25 +161,24 @@ public class AcceptorImplDUnitTest implements Serializable {
InitialImageOperation.slowImageProcessing = 30;
});
- vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
+ AsyncInvocation<Boolean> completedClient1 =
+ vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
- ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
- clientCacheFactory.setPoolSubscriptionEnabled(true);
- clientCacheFactory.setPoolSubscriptionRedundancy(1);
- clientCacheFactory.setPoolMinConnections(1);
- clientCacheFactory.setPoolMaxConnections(1);
- clientCacheFactory.setPoolReadTimeout(200);
- clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
- clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
- sharedCountersRule.increment("startNow");
- ClientCache
- cache =
- clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
- .set("mcast-port", "0").create();
- ClientRegionFactory<Object, Object> clientRegionFactory =
- cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
- Region region = clientRegionFactory
- .addCacheListener(new CacheListenerAdapter() {
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.setPoolSubscriptionEnabled(true);
+ clientCacheFactory.setPoolSubscriptionRedundancy(1);
+ clientCacheFactory.setPoolMinConnections(1);
+ clientCacheFactory.setPoolMaxConnections(1);
+ clientCacheFactory.setPoolReadTimeout(200);
+ clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+ ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1")
+ .set("durable-client-timeout", "300").set("mcast-port", "0");
+ blackboard.incCount();
+ ClientCache cache = cacheFactory.create();
+
+ ClientRegionFactory<Object, Object> clientRegionFactory =
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
eventCount.incrementAndGet();
@@ -191,45 +188,45 @@ public class AcceptorImplDUnitTest implements Serializable {
public void afterUpdate(EntryEvent event) {
eventCount.incrementAndGet();
}
- })
- .create("subscriptionRegion");
-
- region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
- cache.readyForEvents();
- });
+ }).create("subscriptionRegion");
- AsyncInvocation<Boolean>
- completed =
- vm3.invokeAsync("Start Client2 to add entries to region", () -> {
- while (true) {
- Thread.sleep(100);
- if (sharedCountersRule.getTotal("startNow") == 1) {
- break;
- }
- }
- ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
- clientCacheFactory.setPoolRetryAttempts(0);
- clientCacheFactory.setPoolMinConnections(1);
- clientCacheFactory.setPoolMaxConnections(1);
- clientCacheFactory.setPoolReadTimeout(200);
- clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
- clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
- ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
- ClientRegionFactory<Object, Object> clientRegionFactory =
- cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
- Region region = clientRegionFactory.create("subscriptionRegion");
-
- for (int i = 0; i < 100; i++) {
- System.err.println("i = " + region.get(i));
- }
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ cache.readyForEvents();
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> eventCount.get() == numberOfEntries);
cache.close();
- return true;
+ return eventCount.get() == numberOfEntries;
});
- Awaitility.await().atMost(40, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
- .until(() -> vm2.invoke(() -> {
- return eventCount.get();
- }) == numberOfEntries);
- assertTrue(completed.get());
+
+ vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+ while (true) {
+ Thread.sleep(100);
+ if (blackboard.getCount() == 1) {
+ break;
+ }
+ }
+ ClientCache cache = null;
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.setPoolRetryAttempts(0);
+ clientCacheFactory.setPoolMinConnections(1);
+ clientCacheFactory.setPoolMaxConnections(1);
+ clientCacheFactory.setPoolReadTimeout(200);
+ clientCacheFactory.setPoolSocketConnectTimeout(500);
+ clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+ cache = clientCacheFactory.set("mcast-port", "0").create();
+ ClientRegionFactory<Object, Object> clientRegionFactory =
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ Region region = clientRegionFactory.create("subscriptionRegion");
+
+ int returnValue = 0;
+ for (int i = 0; i < 100; i++) {
+ returnValue = (int) region.get(i);
+ }
+ cache.close();
+ completedClient2.set(returnValue == 99);
+ });
+ assertTrue(completedClient1.get());
+ assertTrue(vm3.invoke(() -> completedClient2.get()));
}
private int createSubscriptionServer(InternalCache cache) throws IOException {
@@ -241,21 +238,20 @@ public class AcceptorImplDUnitTest implements Serializable {
private void initializeDiskStore(InternalCache cache) throws IOException {
DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
diskStoreAttributes.name = "clientQueueDS";
- diskStoreAttributes.diskDirs = new File[]{tempDir.newFolder(name + "_dir")};
+ diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")};
cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
}
private void initializeReplicateRegion(InternalCache cache) {
cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
- .setSubscriptionAttributes(new SubscriptionAttributes(
- InterestPolicy.ALL)).create("subscriptionRegion");
+ .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL))
+ .create("subscriptionRegion");
}
private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
throws IOException {
CacheServer cacheServer1 = cache.addCacheServer(false);
- ClientSubscriptionConfig clientSubscriptionConfig =
- cacheServer1.getClientSubscriptionConfig();
+ ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig();
clientSubscriptionConfig.setEvictionPolicy("entry");
clientSubscriptionConfig.setCapacity(5);
clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
index dc42da8..b65bf86 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
@@ -63,6 +63,7 @@ public class CacheRule extends DistributedExternalResource {
private final boolean disconnectAfter;
private final List<VM> createCacheInVMs;
private final Properties config;
+ private final Properties systemProperties;
public static Builder builder() {
return new Builder();
@@ -74,18 +75,19 @@ public class CacheRule extends DistributedExternalResource {
this.disconnectAfter = builder.disconnectAfter;
this.createCacheInVMs = builder.createCacheInVMs;
this.config = builder.config;
+ this.systemProperties = builder.systemProperties;
}
@Override
protected void before() {
if (createCacheInAll) {
- invoker().invokeInEveryVMAndController(() -> createCache(config));
+ invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties));
} else {
if (createCache) {
- createCache(config);
+ createCache(config, systemProperties);
}
for (VM vm : createCacheInVMs) {
- vm.invoke(() -> createCache(config));
+ vm.invoke(() -> createCache(config, systemProperties));
}
}
}
@@ -108,7 +110,8 @@ public class CacheRule extends DistributedExternalResource {
return cache.getInternalDistributedSystem();
}
- private static void createCache(final Properties config) {
+ private static void createCache(final Properties config, final Properties systemProperties) {
+ System.getProperties().putAll(systemProperties);
cache = (InternalCache) new CacheFactory(config).create();
}
@@ -141,6 +144,7 @@ public class CacheRule extends DistributedExternalResource {
private boolean disconnectAfter;
private List<VM> createCacheInVMs = new ArrayList<>();
private Properties config = new Properties();
+ private Properties systemProperties = new Properties();
public Builder() {
config.setProperty(LOCATORS, getLocators());
@@ -195,6 +199,16 @@ public class CacheRule extends DistributedExternalResource {
return this;
}
+ public Builder addSystemProperty(final String key, final String value) {
+ this.systemProperties.put(key, value);
+ return this;
+ }
+
+ public Builder addSystemProperties(final Properties config) {
+ this.systemProperties.putAll(config);
+ return this;
+ }
+
public CacheRule build() {
return new CacheRule(this);
}
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.