You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2019/06/22 01:01:59 UTC

[hbase] branch branch-1 updated: HBASE-22492 Wrap RPC responses with SASL after queueing for response (Sébastien Barnoud)

This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 36f4ab0  HBASE-22492 Wrap RPC responses with SASL after queueing for response (Sébastien Barnoud)
36f4ab0 is described below

commit 36f4ab07161874b31362854d974c077661e518f1
Author: Josh Elser <el...@apache.org>
AuthorDate: Fri Jun 21 16:29:08 2019 -0400

    HBASE-22492 Wrap RPC responses with SASL after queueing for response (Sébastien Barnoud)
    
    Amending-Author: Josh Elser <el...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     | 38 ++++++++++++++++------
 1 file changed, 28 insertions(+), 10 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index a32040c..efc7f89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -342,6 +342,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
     private User user;
     private InetAddress remoteAddress;
+    private boolean saslWrapDone;
 
     private long responseCellSize = 0;
     private long responseBlockSize = 0;
@@ -369,6 +370,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       this.tinfo = tinfo;
       this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
       this.remoteAddress = remoteAddress;
+      this.saslWrapDone = false;
       this.retryImmediatelySupported =
           connection == null? null: connection.retryImmediatelySupported;
       this.timeout = timeout;
@@ -478,10 +480,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         byte[] b = createHeaderAndMessageBytes(result, header);
 
         bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
-
-        if (connection.useWrap) {
-          bc = wrapWithSasl(bc);
-        }
       } catch (IOException e) {
         LOG.warn("Exception while creating response " + e);
       }
@@ -526,6 +524,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       return b;
     }
 
+    private synchronized void wrapWithSasl() throws IOException {
+      // do it only once per call
+      if (saslWrapDone) {
+        return;
+      }
+      response = wrapWithSasl(response);
+      saslWrapDone = true;
+    }
+
+    /**
+     * Do not call directly, invoke via {@link #wrapWithSasl()}.
+     */
     private BufferChain wrapWithSasl(BufferChain bc)
         throws IOException {
       if (!this.connection.useSasl) return bc;
@@ -533,11 +543,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       // THIS IS A BIG UGLY COPY.
       byte [] responseBytes = bc.getBytes();
       byte [] token;
-      // synchronization may be needed since there can be multiple Handler
-      // threads using saslServer to wrap responses.
-      synchronized (connection.saslServer) {
-        token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
-      }
+
+      // Previously, synchronization was needed since there could be multiple Handler
+      // threads using saslServer to wrap responses. However, now we wrap the response
+      // inside of the Responder thread to avoid sending back mis-ordered SASL messages.
+      token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
       if (LOG.isTraceEnabled()) {
         LOG.trace("Adding saslServer wrapped token of size " + token.length
             + " as call response.");
@@ -1188,6 +1198,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
     private boolean processResponse(final Call call) throws IOException {
       boolean error = true;
       try {
+        // Wrap the message "late" in SASL to ensure that the sequence number matches the order of
+        // responses we write out.
+        if (call.connection.useWrap) {
+          call.wrapWithSasl();
+        }
         // Send as much data as we can in the non-blocking fashion
         long numBytes = channelWrite(call.connection.channel, call.response);
         if (numBytes < 0) {
@@ -1220,6 +1235,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
      */
     private boolean processAllResponses(final Connection connection) throws IOException {
       // We want only one writer on the channel for a connection at a time.
+      boolean isEmpty = false;
       connection.responseWriteLock.lock();
       try {
         for (int i = 0; i < 20; i++) {
@@ -1233,11 +1249,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
             return false;
           }
         }
+        // Check that state within the lock to be consistent
+        isEmpty = connection.responseQueue.isEmpty();
       } finally {
         connection.responseWriteLock.unlock();
       }
 
-      return connection.responseQueue.isEmpty();
+      return isEmpty;
     }
 
     //