You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/09/19 17:14:08 UTC
[geode] branch develop updated: GEODE-3609 Small AcceptorImpl
refactor (#772)
This is an automated email from the ASF dual-hosted git repository.
gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 8211f2f GEODE-3609 Small AcceptorImpl refactor (#772)
8211f2f is described below
commit 8211f2fd26fa57ad2a49634d7e3d343ec6106c6c
Author: galen-pivotal <go...@pivotal.io>
AuthorDate: Tue Sep 19 10:14:06 2017 -0700
GEODE-3609 Small AcceptorImpl refactor (#772)
Use one CommunicationMode variable, not 3.
Also, refactor the selector and not-selector cases into separate
methods.
I'm not changing the `ServerConnectionFactory` because it more or less
passes straight through to `ServerConnection`, and there's too much
legacy code there to change easily.
---
.../geode/cache/client/internal/PoolImpl.java | 1 -
.../internal/cache/tier/sockets/AcceptorImpl.java | 114 +++++++++++----------
2 files changed, 60 insertions(+), 55 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
index 332046d..d2e1f4b 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
@@ -64,7 +64,6 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PoolFactoryImpl;
import org.apache.geode.internal.cache.PoolManagerImpl;
import org.apache.geode.internal.cache.PoolStats;
-import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.InternalLogWriter;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 68377d8..c660b68 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -281,7 +281,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
/**
* The client health monitor tracking connections for this acceptor
*/
- private ClientHealthMonitor healthMonitor;
+ private final ClientHealthMonitor healthMonitor;
/**
* bridge's setting of notifyBySubscription
@@ -1411,70 +1411,34 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
// communication, create a ServerConnection. If this socket is being used
// for 'server to client' communication, send it to the CacheClientNotifier
// for processing.
- byte communicationMode;
- if (isSelector()) {
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1);
- final SocketChannel socketChannel = socket.getChannel();
- socketChannel.configureBlocking(false);
- // try to read the byte first in non-blocking mode
- int res = socketChannel.read(byteBuffer);
- socketChannel.configureBlocking(true);
- if (res < 0) {
- throw new EOFException();
- } else if (res == 0) {
- // now do a blocking read so setup a timer to close the socket if the
- // the read takes too long
- SystemTimer.SystemTimerTask timerTask = new SystemTimer.SystemTimerTask() {
- @Override
- public void run2() {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.AcceptorImpl_CACHE_SERVER_TIMED_OUT_WAITING_FOR_HANDSHAKE_FROM__0,
- socket.getRemoteSocketAddress()));
- closeSocket(socket);
- }
- };
- this.hsTimer.schedule(timerTask, this.acceptTimeout);
- res = socketChannel.read(byteBuffer);
- if ((!timerTask.cancel()) || res <= 0) {
- throw new EOFException();
- }
- }
- communicationMode = byteBuffer.get(0);
- } else {
- socket.setSoTimeout(this.acceptTimeout);
- this.socketCreator.configureServerSSLSocket(socket);
- communicationMode = (byte) socket.getInputStream().read();
- if (communicationMode == -1) {
- throw new EOFException();
+ final CommunicationMode communicationMode;
+ try {
+ if (isSelector()) {
+ communicationMode = getCommunicationModeForSelector(socket);
+ } else {
+ communicationMode = getCommunicationModeForNonSelector(socket);
}
- socket.setSoTimeout(0);
- }
-
- socket.setTcpNoDelay(this.tcpNoDelay);
+ socket.setTcpNoDelay(this.tcpNoDelay);
- final CommunicationMode mode;
- try {
- mode = CommunicationMode.fromModeNumber(communicationMode);
} catch (IllegalArgumentException e) {
- // possible if a client uses SSL & the server isn't configured to use SSL
+ // possible if a client uses SSL & the server isn't configured to use SSL,
+ // or if an invalid communication communication mode byte is sent.
logger.warn("Error processing client connection", e);
throw new EOFException();
}
- String communicationModeStr;
- if (mode.isSubscriptionFeed()) {
- boolean primary = mode == CommunicationMode.PrimaryServerToClient;
+ if (communicationMode.isSubscriptionFeed()) {
+ boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
primary ? "primary" : "secondary", socket);
AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
this.notifyBySubscription);
return;
}
- communicationModeStr = mode.toString();
- logger.debug("Bridge server: Initializing {} communication socket: {}", communicationModeStr,
+ logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
socket);
- boolean notForQueue = (mode != ClientToServerForQueue);
+ boolean notForQueue = (communicationMode != ClientToServerForQueue);
if (notForQueue) {
int curCnt = this.getClientServerCnxCount();
if (curCnt >= this.maxConnections) {
@@ -1482,7 +1446,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
Integer.valueOf(this.maxConnections)}));
- if (mode.expectsConnectionRefusalMessage()) {
+ if (communicationMode.expectsConnectionRefusalMessage()) {
try {
ServerHandShakeProcessor.refuse(socket.getOutputStream(),
LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
@@ -1496,9 +1460,10 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
}
}
- ServerConnection serverConn = serverConnectionFactory.makeServerConnection(socket, this.cache,
- this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize,
- communicationModeStr, communicationMode, this, this.securityService, this.getBindAddress());
+ ServerConnection serverConn =
+ serverConnectionFactory.makeServerConnection(socket, this.cache, this.crHelper, this.stats,
+ AcceptorImpl.handShakeTimeout, this.socketBufferSize, communicationMode.toString(),
+ communicationMode.getModeNumber(), this, this.securityService, this.getBindAddress());
synchronized (this.allSCsLock) {
this.allSCs.add(serverConn);
@@ -1533,6 +1498,47 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
}
}
+ private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
+ socket.setSoTimeout(this.acceptTimeout);
+ this.socketCreator.configureServerSSLSocket(socket);
+ byte communicationModeByte = (byte) socket.getInputStream().read();
+ if (communicationModeByte == -1) {
+ throw new EOFException();
+ }
+ socket.setSoTimeout(0);
+ return CommunicationMode.fromModeNumber(communicationModeByte);
+ }
+
+ private CommunicationMode getCommunicationModeForSelector(Socket socket) throws IOException {
+ ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1);
+ final SocketChannel socketChannel = socket.getChannel();
+ socketChannel.configureBlocking(false);
+ // try to read the byte first in non-blocking mode
+ int res = socketChannel.read(byteBuffer);
+ socketChannel.configureBlocking(true);
+ if (res < 0) {
+ throw new EOFException();
+ } else if (res == 0) {
+ // now do a blocking read so setup a timer to close the socket if the
+ // the read takes too long
+ SystemTimer.SystemTimerTask timerTask = new SystemTimer.SystemTimerTask() {
+ @Override
+ public void run2() {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.AcceptorImpl_CACHE_SERVER_TIMED_OUT_WAITING_FOR_HANDSHAKE_FROM__0,
+ socket.getRemoteSocketAddress()));
+ closeSocket(socket);
+ }
+ };
+ this.hsTimer.schedule(timerTask, this.acceptTimeout);
+ res = socketChannel.read(byteBuffer);
+ if ((!timerTask.cancel()) || res <= 0) {
+ throw new EOFException();
+ }
+ }
+ return CommunicationMode.fromModeNumber(byteBuffer.get(0));
+ }
+
@Override
public boolean isRunning() {
return !this.shutdownStarted;
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].