You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/09/19 17:14:08 UTC

[geode] branch develop updated: GEODE-3609 Small AcceptorImpl refactor (#772)

This is an automated email from the ASF dual-hosted git repository.

gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8211f2f  GEODE-3609 Small AcceptorImpl refactor (#772)
8211f2f is described below

commit 8211f2fd26fa57ad2a49634d7e3d343ec6106c6c
Author: galen-pivotal <go...@pivotal.io>
AuthorDate: Tue Sep 19 10:14:06 2017 -0700

    GEODE-3609 Small AcceptorImpl refactor (#772)
    
    Use one CommunicationMode variable, not 3.
    
    Also, refactor the selector and not-selector cases into separate
    methods.
    
    I'm not changing the `ServerConnectionFactory` because it more or less
    passes straight through to `ServerConnection`, and there's too much
    legacy code there to change easily.
---
 .../geode/cache/client/internal/PoolImpl.java      |   1 -
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 114 +++++++++++----------
 2 files changed, 60 insertions(+), 55 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
index 332046d..d2e1f4b 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
@@ -64,7 +64,6 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PoolFactoryImpl;
 import org.apache.geode.internal.cache.PoolManagerImpl;
 import org.apache.geode.internal.cache.PoolStats;
-import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.InternalLogWriter;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 68377d8..c660b68 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -281,7 +281,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
   /**
    * The client health monitor tracking connections for this acceptor
    */
-  private ClientHealthMonitor healthMonitor;
+  private final ClientHealthMonitor healthMonitor;
 
   /**
    * bridge's setting of notifyBySubscription
@@ -1411,70 +1411,34 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
     // communication, create a ServerConnection. If this socket is being used
     // for 'server to client' communication, send it to the CacheClientNotifier
     // for processing.
-    byte communicationMode;
-    if (isSelector()) {
-      ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1);
-      final SocketChannel socketChannel = socket.getChannel();
-      socketChannel.configureBlocking(false);
-      // try to read the byte first in non-blocking mode
-      int res = socketChannel.read(byteBuffer);
-      socketChannel.configureBlocking(true);
-      if (res < 0) {
-        throw new EOFException();
-      } else if (res == 0) {
-        // now do a blocking read so setup a timer to close the socket if the
-        // the read takes too long
-        SystemTimer.SystemTimerTask timerTask = new SystemTimer.SystemTimerTask() {
-          @Override
-          public void run2() {
-            logger.warn(LocalizedMessage.create(
-                LocalizedStrings.AcceptorImpl_CACHE_SERVER_TIMED_OUT_WAITING_FOR_HANDSHAKE_FROM__0,
-                socket.getRemoteSocketAddress()));
-            closeSocket(socket);
-          }
-        };
-        this.hsTimer.schedule(timerTask, this.acceptTimeout);
-        res = socketChannel.read(byteBuffer);
-        if ((!timerTask.cancel()) || res <= 0) {
-          throw new EOFException();
-        }
-      }
-      communicationMode = byteBuffer.get(0);
-    } else {
-      socket.setSoTimeout(this.acceptTimeout);
-      this.socketCreator.configureServerSSLSocket(socket);
-      communicationMode = (byte) socket.getInputStream().read();
-      if (communicationMode == -1) {
-        throw new EOFException();
+    final CommunicationMode communicationMode;
+    try {
+      if (isSelector()) {
+        communicationMode = getCommunicationModeForSelector(socket);
+      } else {
+        communicationMode = getCommunicationModeForNonSelector(socket);
       }
-      socket.setSoTimeout(0);
-    }
-
-    socket.setTcpNoDelay(this.tcpNoDelay);
+      socket.setTcpNoDelay(this.tcpNoDelay);
 
-    final CommunicationMode mode;
-    try {
-      mode = CommunicationMode.fromModeNumber(communicationMode);
     } catch (IllegalArgumentException e) {
-      // possible if a client uses SSL & the server isn't configured to use SSL
+      // possible if a client uses SSL & the server isn't configured to use SSL,
+      // or if an invalid communication communication mode byte is sent.
       logger.warn("Error processing client connection", e);
       throw new EOFException();
     }
 
-    String communicationModeStr;
-    if (mode.isSubscriptionFeed()) {
-      boolean primary = mode == CommunicationMode.PrimaryServerToClient;
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
       logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
           primary ? "primary" : "secondary", socket);
       AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
           this.notifyBySubscription);
       return;
     }
-    communicationModeStr = mode.toString();
 
-    logger.debug("Bridge server: Initializing {} communication socket: {}", communicationModeStr,
+    logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
         socket);
-    boolean notForQueue = (mode != ClientToServerForQueue);
+    boolean notForQueue = (communicationMode != ClientToServerForQueue);
     if (notForQueue) {
       int curCnt = this.getClientServerCnxCount();
       if (curCnt >= this.maxConnections) {
@@ -1482,7 +1446,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
             LocalizedStrings.AcceptorImpl_REJECTED_CONNECTION_FROM_0_BECAUSE_CURRENT_CONNECTION_COUNT_OF_1_IS_GREATER_THAN_OR_EQUAL_TO_THE_CONFIGURED_MAX_OF_2,
             new Object[] {socket.getInetAddress(), Integer.valueOf(curCnt),
                 Integer.valueOf(this.maxConnections)}));
-        if (mode.expectsConnectionRefusalMessage()) {
+        if (communicationMode.expectsConnectionRefusalMessage()) {
           try {
             ServerHandShakeProcessor.refuse(socket.getOutputStream(),
                 LocalizedStrings.AcceptorImpl_EXCEEDED_MAX_CONNECTIONS_0
@@ -1496,9 +1460,10 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
       }
     }
 
-    ServerConnection serverConn = serverConnectionFactory.makeServerConnection(socket, this.cache,
-        this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize,
-        communicationModeStr, communicationMode, this, this.securityService, this.getBindAddress());
+    ServerConnection serverConn =
+        serverConnectionFactory.makeServerConnection(socket, this.cache, this.crHelper, this.stats,
+            AcceptorImpl.handShakeTimeout, this.socketBufferSize, communicationMode.toString(),
+            communicationMode.getModeNumber(), this, this.securityService, this.getBindAddress());
 
     synchronized (this.allSCsLock) {
       this.allSCs.add(serverConn);
@@ -1533,6 +1498,47 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
     }
   }
 
+  private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
+    socket.setSoTimeout(this.acceptTimeout);
+    this.socketCreator.configureServerSSLSocket(socket);
+    byte communicationModeByte = (byte) socket.getInputStream().read();
+    if (communicationModeByte == -1) {
+      throw new EOFException();
+    }
+    socket.setSoTimeout(0);
+    return CommunicationMode.fromModeNumber(communicationModeByte);
+  }
+
+  private CommunicationMode getCommunicationModeForSelector(Socket socket) throws IOException {
+    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1);
+    final SocketChannel socketChannel = socket.getChannel();
+    socketChannel.configureBlocking(false);
+    // try to read the byte first in non-blocking mode
+    int res = socketChannel.read(byteBuffer);
+    socketChannel.configureBlocking(true);
+    if (res < 0) {
+      throw new EOFException();
+    } else if (res == 0) {
+      // now do a blocking read so setup a timer to close the socket if the
+      // the read takes too long
+      SystemTimer.SystemTimerTask timerTask = new SystemTimer.SystemTimerTask() {
+        @Override
+        public void run2() {
+          logger.warn(LocalizedMessage.create(
+              LocalizedStrings.AcceptorImpl_CACHE_SERVER_TIMED_OUT_WAITING_FOR_HANDSHAKE_FROM__0,
+              socket.getRemoteSocketAddress()));
+          closeSocket(socket);
+        }
+      };
+      this.hsTimer.schedule(timerTask, this.acceptTimeout);
+      res = socketChannel.read(byteBuffer);
+      if ((!timerTask.cancel()) || res <= 0) {
+        throw new EOFException();
+      }
+    }
+    return CommunicationMode.fromModeNumber(byteBuffer.get(0));
+  }
+
   @Override
   public boolean isRunning() {
     return !this.shutdownStarted;

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].