You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2018/09/06 06:56:40 UTC
[2/4] hbase git commit: HBASE-21061 Fix inconsistent synchronization
in RpcServer
HBASE-21061 Fix inconsistent synchronization in RpcServer
move variables that we don't need synchronized access to out of the critical block.
Signed-off-by: Mike Drob <md...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e23c036a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e23c036a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e23c036a
Branch: refs/heads/branch-1.4
Commit: e23c036a5a808de0db83ad8ca68df7c79f107b0b
Parents: d0b0df0
Author: Sean Busbey <bu...@apache.org>
Authored: Sat Aug 25 21:28:03 2018 -0500
Committer: Sean Busbey <bu...@apache.org>
Committed: Thu Sep 6 00:31:17 2018 -0500
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 152 ++++++++++---------
1 file changed, 81 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e23c036a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 7d7dd9d..e318f3f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -1640,7 +1640,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* @throws IOException
* @throws InterruptedException
*/
- public synchronized int readAndProcess() throws IOException, InterruptedException {
+ public int readAndProcess() throws IOException, InterruptedException {
// If we have not read the connection setup preamble, look to see if that is on the wire.
if (!connectionPreambleRead) {
int count = readPreamble();
@@ -1668,85 +1668,95 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
- // We have read a length and we have read the preamble. It is either the connection header
- // or it is a request.
- if (data == null) {
- dataLengthBuffer.flip();
- int dataLength = dataLengthBuffer.getInt();
- if (dataLength == RpcClient.PING_CALL_ID) {
- if (!useWrap) { //covers the !useSasl too
- dataLengthBuffer.clear();
- return 0; //ping message
+ final boolean useWrap = this.useWrap;
+ final BlockingService service = this.service;
+ final boolean headerAndPreambleRead = connectionHeaderRead && connectionPreambleRead;
+ final boolean canUseRequestTooBig = headerAndPreambleRead &&
+ VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
+ RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION);
+
+ // we're guarding against data being modified concurrently
+ // while trying to keep other instance members out of the block
+ synchronized(this) {
+ // We have read a length and we have read the preamble. It is either the connection header
+ // or it is a request.
+ if (data == null) {
+ dataLengthBuffer.flip();
+ int dataLength = dataLengthBuffer.getInt();
+ if (dataLength == RpcClient.PING_CALL_ID) {
+ if (!useWrap) { //covers the !useSasl too
+ dataLengthBuffer.clear();
+ return 0; //ping message
+ }
+ }
+ if (dataLength < 0) { // A data length of zero is legal.
+ throw new DoNotRetryIOException("Unexpected data length "
+ + dataLength + "!! from " + getHostAddress());
}
- }
- if (dataLength < 0) { // A data length of zero is legal.
- throw new DoNotRetryIOException("Unexpected data length "
- + dataLength + "!! from " + getHostAddress());
- }
-
- if (dataLength > maxRequestSize) {
- String msg = "RPC data length of " + dataLength + " received from "
- + getHostAddress() + " is greater than max allowed "
- + maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE
- + "\" on server to override this limit (not recommended)";
- LOG.warn(msg);
- if (connectionHeaderRead && connectionPreambleRead) {
- incRpcCount();
- // Construct InputStream for the non-blocking SocketChannel
- // We need the InputStream because we want to read only the request header
- // instead of the whole rpc.
- final ByteBuffer buf = ByteBuffer.allocate(1);
- InputStream is = new InputStream() {
- @Override
- public int read() throws IOException {
- channelRead(channel, buf);
- buf.flip();
- int x = buf.get();
- buf.flip();
- return x;
+ if (dataLength > maxRequestSize) {
+ String msg = "RPC data length of " + dataLength + " received from "
+ + getHostAddress() + " is greater than max allowed "
+ + maxRequestSize + ". Set \"" + MAX_REQUEST_SIZE
+ + "\" on server to override this limit (not recommended)";
+ LOG.warn(msg);
+
+ if (headerAndPreambleRead) {
+ incRpcCount();
+ // Construct InputStream for the non-blocking SocketChannel
+ // We need the InputStream because we want to read only the request header
+ // instead of the whole rpc.
+ final ByteBuffer buf = ByteBuffer.allocate(1);
+ InputStream is = new InputStream() {
+ @Override
+ public int read() throws IOException {
+ channelRead(channel, buf);
+ buf.flip();
+ int x = buf.get();
+ buf.flip();
+ return x;
+ }
+ };
+ CodedInputStream cis = CodedInputStream.newInstance(is);
+ int headerSize = cis.readRawVarint32();
+ Message.Builder builder = RequestHeader.newBuilder();
+ ProtobufUtil.mergeFrom(builder, cis, headerSize);
+ RequestHeader header = (RequestHeader) builder.build();
+
+ // Notify the client about the offending request
+ Call reqTooBig = new Call(header.getCallId(), service, null, null, null,
+ null, this, responder, 0, null, this.addr,0);
+ metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
+ // Make sure the client recognizes the underlying exception
+ // Otherwise, throw a DoNotRetryIOException.
+ if (canUseRequestTooBig) {
+ setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg);
+ } else {
+ setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
}
- };
- CodedInputStream cis = CodedInputStream.newInstance(is);
- int headerSize = cis.readRawVarint32();
- Message.Builder builder = RequestHeader.newBuilder();
- ProtobufUtil.mergeFrom(builder, cis, headerSize);
- RequestHeader header = (RequestHeader) builder.build();
-
- // Notify the client about the offending request
- Call reqTooBig = new Call(header.getCallId(), this.service, null, null, null,
- null, this, responder, 0, null, this.addr,0);
- metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
- // Make sure the client recognizes the underlying exception
- // Otherwise, throw a DoNotRetryIOException.
- if (VersionInfoUtil.hasMinimumVersion(connectionHeader.getVersionInfo(),
- RequestTooBigException.MAJOR_VERSION, RequestTooBigException.MINOR_VERSION)) {
- setupResponse(null, reqTooBig, REQUEST_TOO_BIG_EXCEPTION, msg);
- } else {
- setupResponse(null, reqTooBig, new DoNotRetryIOException(), msg);
+ // We are going to close the connection, make sure we process the response
+ // before that. In rare case when this fails, we still close the connection.
+ responseWriteLock.lock();
+ responder.processResponse(reqTooBig);
+ responseWriteLock.unlock();
}
- // We are going to close the connection, make sure we process the response
- // before that. In rare case when this fails, we still close the connection.
- responseWriteLock.lock();
- responder.processResponse(reqTooBig);
- responseWriteLock.unlock();
+ // Close the connection
+ return -1;
}
- // Close the connection
- return -1;
- }
- data = ByteBuffer.allocate(dataLength);
+ data = ByteBuffer.allocate(dataLength);
- // Increment the rpc count. This counter will be decreased when we write
- // the response. If we want the connection to be detected as idle properly, we
- // need to keep the inc / dec correct.
- incRpcCount();
- }
+ // Increment the rpc count. This counter will be decreased when we write
+ // the response. If we want the connection to be detected as idle properly, we
+ // need to keep the inc / dec correct.
+ incRpcCount();
+ }
- count = channelRead(channel, data);
+ count = channelRead(channel, data);
- if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
- process();
+ if (count >= 0 && data.remaining() == 0) { // count==0 if dataLength == 0
+ process();
+ }
}
return count;