You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/10/25 19:37:57 UTC

[geode] 01/01: GEODE-3637: Moved client queue initialization into the ServerConnection.java

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/GEODE-3637
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 5a53b68b3870c78fe6d8d9829b1d4ea4cdf439d0
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Wed Oct 25 12:37:50 2017 -0700

    GEODE-3637: Moved client queue initialization into the ServerConnection.java
---
 .../internal/cache/tier/sockets/AcceptorImpl.java  | 21 +++---
 .../cache/tier/sockets/ServerConnection.java       | 77 +++++++++++++++-------
 2 files changed, 65 insertions(+), 33 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 59ef466..ad910bd 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1405,6 +1405,10 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
     return this.clientServerCnxCount.get();
   }
 
+  public boolean isNotifyBySubscription() {
+    return notifyBySubscription;
+  }
+
   protected void handleNewClientConnection(final Socket socket,
       final ServerConnectionFactory serverConnectionFactory) throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
@@ -1427,14 +1431,15 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
       throw new EOFException();
     }
 
-    if (communicationMode.isSubscriptionFeed()) {
-      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
-      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
-          primary ? "primary" : "secondary", socket);
-      AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
-          this.notifyBySubscription);
-      return;
-    }
+    // GEODE-3637
+    // if (communicationMode.isSubscriptionFeed()) {
+    // boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+    // logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+    // primary ? "primary" : "secondary", socket);
+    // AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
+    // this.notifyBySubscription);
+    // return;
+    // }
 
     logger.debug("Bridge server: Initializing {} communication socket: {}", communicationMode,
         socket);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 7fc688c..5c69769 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -72,7 +72,7 @@ import org.apache.geode.security.GemFireSecurityException;
  * Provides an implementation for the server socket end of the hierarchical cache connection. Each
  * server connection runs in its own thread to maximize concurrency and improve response times to
  * edge requests
- *
+ * 
  * @since GemFire 2.0.2
  */
 public abstract class ServerConnection implements Runnable {
@@ -189,16 +189,18 @@ public abstract class ServerConnection implements Runnable {
    */
   private volatile int requestSpecificTimeout = -1;
 
-  /** Tracks the id of the most recent batch to which a reply has been sent */
+  /**
+   * Tracks the id of the most recent batch to which a reply has been sent
+   */
   private int latestBatchIdReplied = -1;
 
   /*
    * Uniquely identifying the client's Distributed System
    *
-   * 
+   *
    * private String membershipId;
-   * 
-   * 
+   *
+   *
    * Uniquely identifying the client's ConnectionProxy object
    *
    *
@@ -711,8 +713,9 @@ public abstract class ServerConnection implements Runnable {
       // can be used.
       initializeCommands();
       // its initialized in verifyClientConnection call
-      if (!getCommunicationMode().isWAN())
+      if (!getCommunicationMode().isWAN()) {
         initializeClientUserAuths();
+      }
     }
     if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
       Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake),
@@ -892,7 +895,9 @@ public abstract class ServerConnection implements Runnable {
         }
       }
       if (unregisterClient)// last serverconnection call all close on auth objects
+      {
         cleanClientAuths();
+      }
       this.clientUserAuths = null;
       if (needsUnregister) {
         this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this);
@@ -917,8 +922,9 @@ public abstract class ServerConnection implements Runnable {
     ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode());
     ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua);
 
-    if (retCua == null)
+    if (retCua == null) {
       return cua;
+    }
     return retCua;
   }
 
@@ -954,8 +960,9 @@ public abstract class ServerConnection implements Runnable {
         boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId());
 
         // if not successfull, try the old way
-        if (!removed)
+        if (!removed) {
           removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive);
+        }
         return removed;
 
       } catch (NullPointerException npe) {
@@ -984,7 +991,7 @@ public abstract class ServerConnection implements Runnable {
         /*
          * This means that client and server VMs have different security settings. The server does
          * not have any security settings specified while client has.
-         * 
+         *
          * Here, should we just ignore this and send the dummy security part (connectionId, userId)
          * in the response (in this case, client needs to know that it is not expected to read any
          * security part in any of the server response messages) or just throw an exception
@@ -1010,7 +1017,6 @@ public abstract class ServerConnection implements Runnable {
         throw new AuthenticationFailedException("Authentication failed");
       }
 
-
       byte[] credBytes = msg.getPart(0).getSerializedForm();
 
       credBytes = ((HandShake) this.handshake).decryptBytes(credBytes);
@@ -1066,7 +1072,7 @@ public abstract class ServerConnection implements Runnable {
   /**
    * MessageType of the messages (typically internal commands) which do not need to participate in
    * security should be added in the following if block.
-   *
+   * 
    * @return Part
    * @see AbstractOp#processSecureBytes(Connection, Message)
    * @see AbstractOp#needsUserId()
@@ -1124,9 +1130,12 @@ public abstract class ServerConnection implements Runnable {
 
   public void run() {
     setOwner();
+
     if (getAcceptor().isSelector()) {
       boolean finishedMsg = false;
       try {
+        initializeClientNofication();
+
         this.stats.decThreadQueueSize();
         if (!isTerminated()) {
           getAcceptor().setTLCommBuffer();
@@ -1136,9 +1145,7 @@ public abstract class ServerConnection implements Runnable {
             finishedMsg = true;
           }
         }
-      } catch (java.nio.channels.ClosedChannelException ignore) {
-        // ok shutting down
-      } catch (CancelException e) {
+      } catch (java.nio.channels.ClosedChannelException | CancelException ignore) {
         // ok shutting down
       } catch (IOException ex) {
         logger.warn(
@@ -1183,10 +1190,21 @@ public abstract class ServerConnection implements Runnable {
     }
   }
 
+  private void initializeClientNofication() throws IOException {
+    if (communicationMode.isSubscriptionFeed()) {
+      boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+      logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+          primary ? "primary" : "secondary", theSocket);
+      getAcceptor().getCacheClientNotifier().registerClient(theSocket, primary,
+          getAcceptor().getAcceptorId(), getAcceptor().isNotifyBySubscription());
+    }
+  }
+
   /**
    * If registered with a selector then this will be the key we are registered with.
    */
   // private SelectionKey sKey = null;
+
   /**
    * Register this connection with the given selector for read events. Note that switch the channel
    * to non-blocking so it can be in a selector.
@@ -1202,7 +1220,8 @@ public abstract class ServerConnection implements Runnable {
   }
 
   public void registerWithSelector2(Selector s) throws IOException {
-    /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this);
+    /* this.sKey = */
+    getSelectableChannel().register(s, SelectionKey.OP_READ, this);
   }
 
   /**
@@ -1225,7 +1244,6 @@ public abstract class ServerConnection implements Runnable {
   }
 
   /**
-   *
    * @return String representing the DistributedSystemMembership of the Client VM
    */
   public String getMembershipID() {
@@ -1265,10 +1283,11 @@ public abstract class ServerConnection implements Runnable {
   }
 
   protected int getClientReadTimeout() {
-    if (this.requestSpecificTimeout == -1)
+    if (this.requestSpecificTimeout == -1) {
       return this.handshake.getClientReadTimeout();
-    else
+    } else {
       return this.requestSpecificTimeout;
+    }
   }
 
   protected boolean isProcessingMessage() {
@@ -1492,7 +1511,7 @@ public abstract class ServerConnection implements Runnable {
 
   /**
    * Just ensure that this class gets loaded.
-   *
+   * 
    * @see SystemFailure#loadEmergencyClasses()
    */
   public static void loadEmergencyClasses() {
@@ -1519,7 +1538,9 @@ public abstract class ServerConnection implements Runnable {
     return this.name;
   }
 
-  /** returns the name of this connection */
+  /**
+   * returns the name of this connection
+   */
   public String getName() {
     return this.name;
   }
@@ -1736,11 +1757,13 @@ public abstract class ServerConnection implements Runnable {
     // for backward client it will be store in member variable userAuthId
     // for other look "requestMsg" here and get unique-id from this to get the authzrequest
 
-    if (!AcceptorImpl.isAuthenticationRequired())
+    if (!AcceptorImpl.isAuthenticationRequired()) {
       return null;
+    }
 
-    if (AcceptorImpl.isIntegratedSecurity())
+    if (AcceptorImpl.isIntegratedSecurity()) {
       return null;
+    }
 
     long uniqueId = getUniqueId();
 
@@ -1768,11 +1791,13 @@ public abstract class ServerConnection implements Runnable {
 
   public AuthorizeRequestPP getPostAuthzRequest()
       throws AuthenticationRequiredException, IOException {
-    if (!AcceptorImpl.isAuthenticationRequired())
+    if (!AcceptorImpl.isAuthenticationRequired()) {
       return null;
+    }
 
-    if (AcceptorImpl.isIntegratedSecurity())
+    if (AcceptorImpl.isIntegratedSecurity()) {
       return null;
+    }
 
     // look client version and return authzrequest
     // for backward client it will be store in member variable userAuthId
@@ -1799,7 +1824,9 @@ public abstract class ServerConnection implements Runnable {
     return postAuthReq;
   }
 
-  /** returns the member ID byte array to be used for creating EventID objects */
+  /**
+   * returns the member ID byte array to be used for creating EventID objects
+   */
   public byte[] getEventMemberIDByteArray() {
     return this.memberIdByteArray;
   }

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.