You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/31 20:55:51 UTC
[geode] 01/02: GEODE-3637: Committing tests in order not to loose
progress
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 55fad5e725b73630d5ccb88ca66917c554c31334
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Fri Oct 27 14:27:30 2017 -0700
GEODE-3637: Committing tests in order not to loose progress
---
.../internal/cache/tier/sockets/AcceptorImpl.java | 364 ++++++++++++---------
.../cache/tier/sockets/ServerConnection.java | 14 +-
.../geode/internal/AcceptorImplDUnitTest.java | 267 +++++++++++++++
.../apache/geode/internal/SSLConfigJUnitTest.java | 5 +-
4 files changed, 478 insertions(+), 172 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index ad910bd..721874f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -95,7 +95,6 @@ import org.apache.geode.internal.util.ArrayUtils;
/**
* Implements the acceptor thread on the bridge server. Accepts connections from the edge and starts
* up threads to process requests from these.
- *
* @since GemFire 2.0.2
*/
@SuppressWarnings("deprecation")
@@ -115,6 +114,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
private final ThreadPoolExecutor hsPool;
/**
+ * A pool used to process client-queue-initializations.
+ */
+ private final ThreadPoolExecutor clientQueueInitPool;
+
+ /**
* The port on which this acceptor listens for client connections
*/
private final int localPort;
@@ -268,7 +272,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* The ip address or host name this acceptor is to bind to; <code>null</code> or "" indicates it
* will listen on all local addresses.
- *
* @since GemFire 5.7
*/
private final String bindHostName;
@@ -308,14 +311,13 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* Initializes this acceptor thread to listen for connections on the given port.
- *
* @param port The port on which this acceptor listens for connections. If <code>0</code>, a
- * random port will be chosen.
+ * random port will be chosen.
* @param bindHostName The ip address or host name this acceptor listens on for connections. If
- * <code>null</code> or "" then all local addresses are used
+ * <code>null</code> or "" then all local addresses are used
* @param socketBufferSize The buffer size for server-side sockets
* @param maximumTimeBetweenPings The maximum time between client pings. This value is used by the
- * <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
+ * <code>ClientHealthMonitor</code> to monitor the health of this server's clients.
* @param internalCache The GemFire cache whose contents is served to clients
* @param maxConnections the maximum number of connections allowed in the server pool
* @param maxThreads the maximum number of threads allowed in the server pool
@@ -324,11 +326,14 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
* @since GemFire 5.7
*/
public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription,
- int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
- int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
- ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
- List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
- ServerConnectionFactory serverConnectionFactory) throws IOException {
+ int socketBufferSize, int maximumTimeBetweenPings,
+ InternalCache internalCache,
+ int maxConnections, int maxThreads, int maximumMessageCount,
+ int messageTimeToLive,
+ ConnectionListener listener, List overflowAttributesList,
+ boolean isGatewayReceiver,
+ List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+ ServerConnectionFactory serverConnectionFactory) throws IOException {
this.securityService = internalCache.getSecurityService();
this.bindHostName = calcBindHostName(internalCache, bindHostName);
this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
@@ -440,7 +445,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
// fix for bug 36617. If BindException is thrown, retry after
// sleeping. The server may have been stopped and then
// immediately restarted, which sometimes results in a bind exception
- for (;;) {
+ for (; ; ) {
try {
this.serverSock.bind(new InetSocketAddress(getBindAddress(), port), backLog);
break;
@@ -469,7 +474,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
// fix for bug 36617. If BindException is thrown, retry after
// sleeping. The server may have been stopped and then
// immediately restarted, which sometimes results in a bind exception
- for (;;) {
+ for (; ; ) {
try {
this.serverSock = this.socketCreator.createServerSocket(port, backLog, getBindAddress(),
this.gatewayTransportFilters, socketBufferSize);
@@ -514,7 +519,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
String sockName = getServerName();
logger.info(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_CACHE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1,
- new Object[] {sockName, Integer.valueOf(backLog)}));
+ new Object[]{sockName, Integer.valueOf(backLog)}));
if (isGatewayReceiver) {
this.stats = GatewayReceiverStats.createGatewayReceiverStats(sockName);
} else {
@@ -534,103 +539,130 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
this.clientNotifier.getStats());
- {
- ThreadPoolExecutor tmp_pool = null;
- String gName = "ServerConnection "
- // + serverSock.getInetAddress()
- + "on port " + this.localPort;
- final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
- ThreadFactory socketThreadFactory = new ThreadFactory() {
- int connNum = -1;
-
- public Thread newThread(final Runnable command) {
- int tnum;
- synchronized (this) {
- tnum = ++connNum;
+ pool = initializeServerConnectionThreadPool();
+ hsPool = initializeHandshakerThreadPool();
+ clientQueueInitPool = initializeClientQueueInitializerThreadPool();
+
+ isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+ isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+ String postAuthzFactoryName =
+ this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+ isPostAuthzCallbackPresent =
+ (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+ }
+
+ private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException {
+ String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
+ final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+ ThreadFactory socketThreadFactory = new ThreadFactory() {
+ AtomicInteger connNum = new AtomicInteger(-1);
+
+ @Override
+ public Thread newThread(Runnable command) {
+ String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+ getStats().incAcceptThreadsCreated();
+ return new Thread(socketThreadGroup, command, threadName);
+ }
+ };
+ try {
+ final BlockingQueue blockingQueue = new SynchronousQueue();
+ final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+ try {
+ blockingQueue.put(r);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt(); // preserve the state
+ throw new RejectedExecutionException(
+ LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
}
- String tName = socketThreadGroup.getName() + " Thread " + tnum;
- getStats().incConnectionThreadsCreated();
- Runnable r = new Runnable() {
- public void run() {
- try {
- command.run();
- } catch (CancelException e) { // bug 39463
- // ignore
- } finally {
- ConnectionTable.releaseThreadsSockets();
- }
- }
- };
- return new Thread(socketThreadGroup, r, tName);
}
};
- try {
- if (isSelector()) {
- tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
- getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
- } else {
- tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
- TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
- }
- } catch (IllegalArgumentException poolInitException) {
- this.stats.close();
- this.serverSock.close();
- throw poolInitException;
- }
- this.pool = tmp_pool;
+ return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
+ socketThreadFactory, rejectedExecutionHandler);
+ } catch (IllegalArgumentException poolInitException) {
+ this.stats.close();
+ this.serverSock.close();
+ this.pool.shutdownNow();
+ throw poolInitException;
}
- {
- ThreadPoolExecutor tmp_hsPool = null;
- String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
- final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+ }
- ThreadFactory socketThreadFactory = new ThreadFactory() {
- int connNum = -1;
+ private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
+ final ThreadGroup clientQueueThreadGroup = LoggingThreadGroup.createThreadGroup(
+ "Client Queue Initialization ", logger);
- public Thread newThread(Runnable command) {
- int tnum;
- synchronized (this) {
- tnum = ++connNum;
- }
- String tName = socketThreadGroup.getName() + " Thread " + tnum;
- getStats().incAcceptThreadsCreated();
- return new Thread(socketThreadGroup, command, tName);
- }
- };
- try {
- final BlockingQueue bq = new SynchronousQueue();
- final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
- public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
+ ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
+ AtomicInteger connNum = new AtomicInteger(-1);
+
+ @Override
+ public Thread newThread(final Runnable command) {
+ String
+ threadName =
+ clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+ Runnable runnable = new Runnable() {
+ public void run() {
try {
- bq.put(r);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt(); // preserve the state
- throw new RejectedExecutionException(
- LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
+ command.run();
+ } catch (CancelException e) {
+ logger.debug("Client Queue Initialization was canceled.", e);
}
}
};
- tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq,
- socketThreadFactory, reh);
- } catch (IllegalArgumentException poolInitException) {
- this.stats.close();
- this.serverSock.close();
- this.pool.shutdownNow();
- throw poolInitException;
+ return new Thread(clientQueueThreadGroup, runnable, threadName);
}
- this.hsPool = tmp_hsPool;
+ };
+ try {
+ return new ThreadPoolExecutor(1, 16, 60,
+ TimeUnit.SECONDS, new SynchronousQueue(), clientQueueThreadFactory);
+ } catch (IllegalArgumentException poolInitException) {
+ throw poolInitException;
}
+ }
- isAuthenticationRequired = this.securityService.isClientSecurityRequired();
-
- isIntegratedSecurity = this.securityService.isIntegratedSecurity();
-
- String postAuthzFactoryName =
- this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-
- isPostAuthzCallbackPresent =
- (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
+ private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException {
+ String gName = "ServerConnection "
+ // + serverSock.getInetAddress()
+ + "on port " + this.localPort;
+ final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+ ThreadFactory socketThreadFactory = new ThreadFactory() {
+ AtomicInteger connNum = new AtomicInteger(-1);
+
+ @Override
+ public Thread newThread(final Runnable command) {
+ String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
+ getStats().incConnectionThreadsCreated();
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ command.run();
+ } catch (CancelException e) { // bug 39463
+ // ignore
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ }
+ }
+ };
+ return new Thread(socketThreadGroup, r, tName);
+ }
+ };
+ try {
+ if (isSelector()) {
+ return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
+ getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
+ } else {
+ return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
+ TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+ }
+ } catch (IllegalArgumentException poolInitException) {
+ this.stats.close();
+ this.serverSock.close();
+ throw poolInitException;
+ }
}
public long getAcceptorId() {
@@ -651,7 +683,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* This system property is only used if max-threads == 0. This is for 5.0.2 backwards
* compatibility.
- *
* @deprecated since 5.1 use cache-server max-threads instead
*/
@Deprecated
@@ -660,7 +691,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* This system property is only used if max-threads == 0. This is for 5.0.2 backwards
* compatibility.
- *
* @deprecated since 5.1 use cache-server max-threads instead
*/
@Deprecated
@@ -798,12 +828,12 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* Ensure that the CachedRegionHelper and ServerConnection classes get loaded.
- *
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
- if (emergencyClassesLoaded)
+ if (emergencyClassesLoaded) {
return;
+ }
emergencyClassesLoaded = true;
CachedRegionHelper.loadEmergencyClasses();
ServerConnection.loadEmergencyClasses();
@@ -870,8 +900,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
private Selector tmpSel;
private void checkForStuckKeys() {
- if (!WORKAROUND_SELECTOR_BUG)
+ if (!WORKAROUND_SELECTOR_BUG) {
return;
+ }
if (tmpSel == null) {
try {
tmpSel = Selector.open();
@@ -887,8 +918,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
while (it.hasNext()) {
SelectionKey sk = (SelectionKey) it.next();
ServerConnection sc = (ServerConnection) sk.attachment();
- if (sc == null)
+ if (sc == null) {
continue;
+ }
try {
sk.cancel();
this.selector.selectNow(); // clear the cancelled key
@@ -1040,40 +1072,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
break;
}
if (events == 0) {
- // zeroEventsCount++;
- // if (zeroEventsCount > 0) {
- // zeroEventsCount = 0;
checkForStuckKeys();
-
- // try {
- // this.selector.close(); // this selector is sick!
- // } catch (IOException ignore) {
- // }
- // this.selector = Selector.open();
- // {
- // Iterator it = selectorRegistrations.iterator();
- // while (it.hasNext()) {
- // ServerConnection sc = (ServerConnection)it.next();
- // sc.registerWithSelector2(this.selector);
- // }
- // }
- // }
- // ArrayList al = new ArrayList();
- // Iterator keysIt = this.selector.keys().iterator();
- // while (keysIt.hasNext()) {
- // SelectionKey sk = (SelectionKey)keysIt.next();
- // al.add(sk.attachment());
- // sk.cancel();
- // }
- // events = this.selector.selectNow();
- // Iterator alIt = al.iterator();
- // while (alIt.hasNext()) {
- // ServerConnection sc = (ServerConnection)alIt.next();
- // sc.registerWithSelector2(this.selector);
- // }
- // events = this.selector.select();
- // } else {
- // zeroEventsCount = 0;
}
while (events > 0) {
int cancelCount = 0;
@@ -1130,16 +1129,11 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
logger.warn(
LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected));
}
- // } else if (key.isValid() && key.isConnectable()) {
- // logger.info("DEBUG isConnectable and isValid key=" + key);
- // finishCon(sc);
} else {
finishCon(sc);
if (key.isValid()) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key));
- // } else {
- // logger.info("DEBUG !isValid key=" + key);
}
}
} catch (CancelledKeyException ex) { // fix for bug 37739
@@ -1195,7 +1189,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* The work loop of this acceptor
- *
* @see #accept
*/
public void run() {
@@ -1236,8 +1229,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
/**
- * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new
- * {@link ServerConnection}to handle messages from that client.
+ * {@linkplain ServerSocket#accept Listens}for a client to connect and then creates a new {@link
+ * ServerConnection}to handle messages from that client.
*/
@Override
public void accept() {
@@ -1325,7 +1318,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
* blocking is good because it will throttle the rate at which we create new connections.
*/
private void handOffNewClientConnection(final Socket socket,
- final ServerConnectionFactory serverConnectionFactory) {
+ final ServerConnectionFactory serverConnectionFactory) {
try {
this.stats.incAcceptsInProgress();
this.hsPool.execute(new Runnable() {
@@ -1410,7 +1403,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
protected void handleNewClientConnection(final Socket socket,
- final ServerConnectionFactory serverConnectionFactory) throws IOException {
+ final ServerConnectionFactory serverConnectionFactory)
+ throws IOException {
// Read the first byte. If this socket is being used for 'client to server'
// communication, create a ServerConnection. If this socket is being used
// for 'server to client' communication, send it to the CacheClientNotifier
@@ -1432,14 +1426,23 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
// GEODE-3637
- // if (communicationMode.isSubscriptionFeed()) {
- // boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
- // logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
- // primary ? "primary" : "secondary", socket);
- // AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
- // this.notifyBySubscription);
- // return;
- // }
+ if (communicationMode.isSubscriptionFeed()) {
+ boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+ logger.info(
+ ":Bridge server: Initializing {} server-to-client communication socket: {} using selector={}",
+ primary ? "primary" : "secondary", socket, isSelector());
+ AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
+ this.notifyBySubscription);
+ return;
+ }
+// if (communicationMode.isSubscriptionFeed()) {
+// boolean
+// isPrimaryServerToClient =
+// communicationMode == CommunicationMode.PrimaryServerToClient;
+// clientQueueInitPool
+// .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
+// return;
+// }
logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
socket);
@@ -1449,7 +1452,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
if (curCnt >= this.maxConnections) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
- new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
+ new Object[]{socket.getInetAddress(), Integer.valueOf(curCnt),
Integer.valueOf(this.maxConnections)}));
if (communicationMode.expectsConnectionRefusalMessage()) {
try {
@@ -1489,7 +1492,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
logger.warn(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_REQUEST_REJECTED_BY_POOL,
- new Object[] {serverConn}));
+ new Object[]{serverConn}));
try {
ServerHandShakeProcessor.refuse(socket.getOutputStream(),
LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
@@ -1664,10 +1667,9 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* @param bindName the ip address or host name that this acceptor should bind to. If null or ""
- * then calculate it.
+ * then calculate it.
* @return the ip address or host name this acceptor will listen on. An "" if all local addresses
- * will be listened to.
- *
+ * will be listened to.
* @since GemFire 5.7
*/
private static String calcBindHostName(Cache cache, String bindName) {
@@ -1704,7 +1706,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* Gets the address that this bridge server can be contacted on from external processes.
- *
* @since GemFire 5.7
*/
public String getExternalAddress() {
@@ -1738,7 +1739,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
/**
* This method finds a client notifier and returns it. It is used to propagate interest
* registrations to other servers
- *
* @return the instance that provides client notification
*/
@Override
@@ -1796,7 +1796,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
* This method returns a thread safe structure which can be iterated over without worrying about
* ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get
* client info.
- *
*/
public ServerConnection[] getAllServerConnectionList() {
return this.allSCList;
@@ -1820,4 +1819,43 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
releaseCommBuffer(Message.setTLCommBuffer(null));
}
+
+ private class ClientQueueInitializerTask implements Runnable {
+ private final Socket socket;
+ private final boolean isPrimaryServerToClient;
+ private final AcceptorImpl acceptor;
+
+ public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
+ AcceptorImpl acceptor) {
+ this.socket = socket;
+ this.acceptor = acceptor;
+ this.isPrimaryServerToClient = isPrimaryServerToClient;
+ }
+
+ @Override
+ public void run() {
+ logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
+ isPrimaryServerToClient ? "primary" : "secondary", socket);
+ try {
+ acceptor.getCacheClientNotifier()
+ .registerClient(socket, isPrimaryServerToClient, acceptor.getAcceptorId(),
+ acceptor.isNotifyBySubscription());
+ } catch (IOException ex) {
+ closeSocket(socket);
+ if (isRunning()) {
+ if (!acceptor.loggedAcceptError) {
+ acceptor.loggedAcceptError = true;
+ if (ex instanceof SocketTimeoutException) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT));
+ } else {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0,
+ ex), ex);
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 5c69769..1cac128 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -1191,13 +1191,13 @@ public abstract class ServerConnection implements Runnable {
}
private void initializeClientNofication() throws IOException {
- if (communicationMode.isSubscriptionFeed()) {
- boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
- logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
- primary ? "primary" : "secondary", theSocket);
- getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
- getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
- }
+// if (communicationMode.isSubscriptionFeed()) {
+// boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+// logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+// primary ? "primary" : "secondary", theSocket);
+// getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
+// getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
+// }
}
/**
diff --git a/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
new file mode 100644
index 0000000..9da6ca9
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/AcceptorImplDUnitTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.InterestResultPolicy;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.server.ClientSubscriptionConfig;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.cache.DiskStoreAttributes;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.dunit.rules.SharedCountersRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(DistributedTest.class)
+public class AcceptorImplDUnitTest implements Serializable {
+ private final Host host = Host.getHost(0);
+ private final static int numberOfEntries = 1000;
+ private final static AtomicInteger eventCount = new AtomicInteger(0);
+
+ @ClassRule
+ public static DistributedTestRule distributedTestRule = new DistributedTestRule();
+
+ @Rule
+ public CacheRule cacheRule = CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(
+ host.getVM(1)).build();
+
+ @Rule
+ public SerializableTestName name = new SerializableTestName();
+
+ @Rule
+ public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
+
+ @Rule
+ public DistributedRestoreSystemProperties restoreSystemProperties = new DistributedRestoreSystemProperties();
+
+ @Rule
+ public SharedCountersRule sharedCountersRule = SharedCountersRule.builder().build();
+
+ @Before
+ public void setup() {
+ Host host = this.host;
+ sharedCountersRule.initialize("startNow");
+ host.getAllVMs().forEach((vm) ->
+ vm.invoke(() -> {
+ System.setProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1");
+ })
+ );
+ }
+
+ @After
+ public void tearDown() {
+
+ host.getAllVMs().forEach((vm) ->
+ vm.invoke(() -> {
+ InitialImageOperation.slowImageProcessing = 0;
+ })
+ );
+ }
+
+ @Test
+ public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception {
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+ VM vm3 = host.getVM(3);
+ int vm0_port = vm0.invoke("Start server with subscription turned on", () -> {
+ try {
+ return createSubscriptionServer(cacheRule.getCache());
+ } catch (IOException e) {
+ return 0;
+ }
+ });
+
+ vm2.invoke("Start Client1 with durable interest registration turned on", () -> {
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.setPoolSubscriptionEnabled(true);
+ clientCacheFactory.setPoolSubscriptionRedundancy(1);
+ clientCacheFactory.setPoolReadTimeout(200);
+ clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+ ClientCache
+ cache =
+ clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
+ .set("mcast-port", "0").create();
+ ClientRegionFactory<Object, Object> clientRegionFactory =
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ Region region = clientRegionFactory.create("subscriptionRegion");
+
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ cache.readyForEvents();
+ cache.close(true);
+ });
+ vm3.invoke("Start Client2 to add entries to region", () -> {
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+ ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+ ClientRegionFactory<Object, Object> clientRegionFactory =
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ Region region = clientRegionFactory.create("subscriptionRegion");
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ region.put(i, i);
+ }
+ cache.close();
+ });
+
+ int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> {
+ try {
+ int serverPort = createSubscriptionServer(cacheRule.getCache());
+ InitialImageOperation.slowImageProcessing = 30;
+ return serverPort;
+ } catch (IOException e) {
+ return 0;
+ }
+ });
+
+ vm0.invoke("Turn on slow image processsing", () -> {
+ InitialImageOperation.slowImageProcessing = 30;
+ });
+
+ vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
+
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.setPoolSubscriptionEnabled(true);
+ clientCacheFactory.setPoolSubscriptionRedundancy(1);
+ clientCacheFactory.setPoolMinConnections(1);
+ clientCacheFactory.setPoolMaxConnections(1);
+ clientCacheFactory.setPoolReadTimeout(200);
+ clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+ clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+ sharedCountersRule.increment("startNow");
+ ClientCache
+ cache =
+ clientCacheFactory.set("durable-client-id", "1").set("durable-client-timeout", "300")
+ .set("mcast-port", "0").create();
+ ClientRegionFactory<Object, Object> clientRegionFactory =
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ Region region = clientRegionFactory
+ .addCacheListener(new CacheListenerAdapter() {
+ @Override
+ public void afterCreate(EntryEvent event) {
+ eventCount.incrementAndGet();
+ }
+
+ @Override
+ public void afterUpdate(EntryEvent event) {
+ eventCount.incrementAndGet();
+ }
+ })
+ .create("subscriptionRegion");
+
+ region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
+ cache.readyForEvents();
+ });
+
+ AsyncInvocation<Boolean>
+ completed =
+ vm3.invokeAsync("Start Client2 to add entries to region", () -> {
+ while (true) {
+ Thread.sleep(100);
+ if (sharedCountersRule.getTotal("startNow") == 1) {
+ break;
+ }
+ }
+ ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+ clientCacheFactory.setPoolRetryAttempts(0);
+ clientCacheFactory.setPoolMinConnections(1);
+ clientCacheFactory.setPoolMaxConnections(1);
+ clientCacheFactory.setPoolReadTimeout(200);
+ clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
+ clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
+ ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
+ ClientRegionFactory<Object, Object> clientRegionFactory =
+ cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+ Region region = clientRegionFactory.create("subscriptionRegion");
+
+ for (int i = 0; i < 100; i++) {
+ System.err.println("i = " + region.get(i));
+ }
+ cache.close();
+ return true;
+ });
+ Awaitility.await().atMost(40, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
+ .until(() -> vm2.invoke(() -> {
+ return eventCount.get();
+ }) == numberOfEntries);
+ assertTrue(completed.get());
+ }
+
+ private int createSubscriptionServer(InternalCache cache) throws IOException {
+ initializeDiskStore(cache);
+ initializeReplicateRegion(cache);
+ return initializeCacheServerWithSubscription(host, cache);
+ }
+
+ private void initializeDiskStore(InternalCache cache) throws IOException {
+ DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
+ diskStoreAttributes.name = "clientQueueDS";
+ diskStoreAttributes.diskDirs = new File[]{tempDir.newFolder(name + "_dir")};
+ cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
+ }
+
+ private void initializeReplicateRegion(InternalCache cache) {
+ cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
+ .setSubscriptionAttributes(new SubscriptionAttributes(
+ InterestPolicy.ALL)).create("subscriptionRegion");
+ }
+
+ private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
+ throws IOException {
+ CacheServer cacheServer1 = cache.addCacheServer(false);
+ ClientSubscriptionConfig clientSubscriptionConfig =
+ cacheServer1.getClientSubscriptionConfig();
+ clientSubscriptionConfig.setEvictionPolicy("entry");
+ clientSubscriptionConfig.setCapacity(5);
+ clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
+ cacheServer1.setPort(0);
+ cacheServer1.setHostnameForClients(host.getHostName());
+ cacheServer1.start();
+ return cacheServer1.getPort();
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
index 357736b..8904380 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/SSLConfigJUnitTest.java
@@ -21,6 +21,7 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -44,7 +45,8 @@ public class SSLConfigJUnitTest {
private static final Properties GATEWAY_SSL_PROPS_MAP = new Properties();
private static final Properties GATEWAY_PROPS_SUBSET_MAP = new Properties();
- static {
+ @BeforeClass
+ public static void initializeSSLMaps() {
SSL_PROPS_MAP.put("javax.net.ssl.keyStoreType", "jks");
SSL_PROPS_MAP.put("javax.net.ssl.keyStore", "/export/gemfire-configs/gemfire.keystore");
@@ -383,7 +385,6 @@ public class SSLConfigJUnitTest {
gemFireProps.put(CLUSTER_SSL_CIPHERS, sslciphers);
gemFireProps.put(CLUSTER_SSL_REQUIRE_AUTHENTICATION, String.valueOf(requireAuth));
-
gemFireProps.put(SERVER_SSL_ENABLED, String.valueOf(cacheServerSslenabled));
gemFireProps.put(SERVER_SSL_PROTOCOLS, cacheServerSslprotocols);
gemFireProps.put(SERVER_SSL_CIPHERS, cacheServerSslciphers);
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.