You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2018/09/06 06:56:40 UTC

[2/4] hbase git commit: HBASE-21061 Fix inconsistent synchronization in RpcServer

HBASE-21061 Fix inconsistent synchronization in RpcServer

move variables that we don't need synchronized access to out of the critical block.

Signed-off-by: Mike Drob <md...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e23c036a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e23c036a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e23c036a

Branch: refs/heads/branch-1.4
Commit: e23c036a5a808de0db83ad8ca68df7c79f107b0b
Parents: d0b0df0
Author: Sean Busbey <bu...@apache.org>
Authored: Sat Aug 25 21:28:03 2018 -0500
Committer: Sean Busbey <bu...@apache.org>
Committed: Thu Sep 6 00:31:17 2018 -0500

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 152 ++++++++++---------
 1 file changed, 81 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e23c036a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 7d7dd9d..e318f3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1640,7 +1640,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
      * @throws IOException
      * @throws InterruptedException
      */
-    public synchronized int readAndProcess() throws IOException, InterruptedException {
+    public int readAndProcess() throws IOException, InterruptedException {
       // If we have not read the connection setup preamble, look to see if that is on the wire.
       if (!connectionPreambleRead) {
         int count = readPreamble();
@@ -1668,85 +1668,95 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
         }
       }
 
-      // We have read a length and we have read the preamble.  It is either the connection header
-      // or it is a request.
-      if (data == null) {
-        dataLengthBuffer.flip();
-        int dataLength = dataLengthBuffer.getInt();
-        if (dataLength == RpcClient.PING_CALL_ID) {
-          if (!useWrap) { //covers the !useSasl too
-            dataLengthBuffer.clear();
-            return 0;  //ping message
+      final boolean useWrap = this.useWrap;
+      final BlockingService service = this.service;
+      final boolean headerAndPreambleRead = connectionHeaderRead && connectionPreambleRead;
+      final boolean canUseRequestTooBig = headerAndPreambleRead &&
+          VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
+              RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION);
+
+      // we're guarding against data being modified concurrently
+      // while trying to keep other instance members out of the block
+      synchronized(this) {
+        // We have read a length and we have read the preamble.  It is either the connection header
+        // or it is a request.
+        if (data == null) {
+          dataLengthBuffer.flip();
+          int dataLength = dataLengthBuffer.getInt();
+          if (dataLength == RpcClient.PING_CALL_ID) {
+            if (!useWrap) { //covers the !useSasl too
+              dataLengthBuffer.clear();
+              return 0;  //ping message
+            }
+          }
+          if (dataLength < 0) { // A data length of zero is legal.
+            throw new DoNotRetryIOException("Unexpected data length "
+                + dataLength + "!! from " + getHostAddress());
           }
-        }
-        if (dataLength < 0) { // A data length of zero is legal.
-          throw new DoNotRetryIOException("Unexpected data length "
-              + dataLength + "!! from " + getHostAddress());
-        }
-
-        if (dataLength > maxRequestSize) {
-          String msg = "RPC data length of " + dataLength + " received from "
-              + getHostAddress() + " is greater than max allowed "
-              + maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE
-              + "\" on server to override this limit (not recommended)";
-          LOG.warn(msg);
 
-          if (connectionHeaderRead && connectionPreambleRead) {
-            incRpcCount();
-            // Construct InputStream for the non-blocking SocketChannel
-            // We need the InputStream because we want to read only the request header
-            // instead of the whole rpc.
-            final ByteBuffer buf = ByteBuffer.allocate(1);
-            InputStream is = new InputStream() {
-              @Override
-              public int read() throws IOException {
-                channelRead(channel, buf);
-                buf.flip();
-                int x = buf.get();
-                buf.flip();
-                return x;
+          if (dataLength > maxRequestSize) {
+            String msg = "RPC data length of " + dataLength + " received from "
+                + getHostAddress() + " is greater than max allowed "
+                + maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE
+                + "\" on server to override this limit (not recommended)";
+            LOG.warn(msg);
+
+            if (headerAndPreambleRead) {
+              incRpcCount();
+              // Construct InputStream for the non-blocking SocketChannel
+              // We need the InputStream because we want to read only the request header
+              // instead of the whole rpc.
+              final ByteBuffer buf = ByteBuffer.allocate(1);
+              InputStream is = new InputStream() {
+                @Override
+                public int read() throws IOException {
+                  channelRead(channel, buf);
+                  buf.flip();
+                  int x = buf.get();
+                  buf.flip();
+                  return x;
+                }
+              };
+              CodedInputStream cis = CodedInputStream.newInstance(is);
+              int headerSize = cis.readRawVarint32();
+              Message.Builder builder = RequestHeader.newBuilder();
+              ProtobufUtil.mergeFrom(builder, cis, headerSize);
+              RequestHeader header = (RequestHeader) builder.build();
+
+              // Notify the client about the offending request
+              Call reqTooBig = new Call(header.getCallId(), service, null, null, null,
+                  null, this, responder, 0, null, this.addr,0);
+              metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
+              // Make sure the client recognizes the underlying exception
+              // Otherwise, throw a DoNotRetryIOException.
+              if (canUseRequestTooBig) {
+                setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg);
+              } else {
+                setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
               }
-            };
-            CodedInputStream cis = CodedInputStream.newInstance(is);
-            int headerSize = cis.readRawVarint32();
-            Message.Builder builder = RequestHeader.newBuilder();
-            ProtobufUtil.mergeFrom(builder, cis, headerSize);
-            RequestHeader header = (RequestHeader) builder.build();
-
-            // Notify the client about the offending request
-            Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null,
-                null, this, responder, 0, null, this.addr,0);
-            metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
-            // Make sure the client recognizes the underlying exception
-            // Otherwise, throw a DoNotRetryIOException.
-            if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
-                RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
-              setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg);
-            } else {
-              setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
+              // We are going to close the connection, make sure we process the response
+              // before that. In rare case when this fails, we still close the connection.
+              responseWriteLock.lock();
+              responder.processResponse(reqTooBig);
+              responseWriteLock.unlock();
             }
-            // We are going to close the connection, make sure we process the response
-            // before that. In rare case when this fails, we still close the connection.
-            responseWriteLock.lock();
-            responder.processResponse(reqTooBig);
-            responseWriteLock.unlock();
+            // Close the connection
+            return -1;
           }
-          // Close the connection
-          return -1;
-        }
 
-        data = ByteBuffer.allocate(dataLength);
+          data = ByteBuffer.allocate(dataLength);
 
-        // Increment the rpc count. This counter will be decreased when we write
-        //  the response.  If we want the connection to be detected as idle properly, we
-        //  need to keep the inc / dec correct.
-        incRpcCount();
-      }
+          // Increment the rpc count. This counter will be decreased when we write
+          //  the response.  If we want the connection to be detected as idle properly, we
+          //  need to keep the inc / dec correct.
+          incRpcCount();
+        }
 
-      count = channelRead(channel, data);
+        count = channelRead(channel, data);
 
-      if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
-        process();
+        if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
+          process();
+        }
       }
 
       return count;