You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2019/06/22 01:02:01 UTC
[hbase] branch branch-1.4 updated: HBASE-22492 Wrap RPC responses with SASL after queueing for response (Sébastien Barnoud)
This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new fc09cdf HBASE-22492 Wrap RPC responses with SASL after queueing for response (Sébastien Barnoud)
fc09cdf is described below
commit fc09cdff30bf1b91e19360fcefd7659c2f5aef1c
Author: Josh Elser <el...@apache.org>
AuthorDate: Fri Jun 21 16:29:08 2019 -0400
HBASE-22492 Wrap RPC responses with SASL after queueing for response (Sébastien Barnoud)
Amending-Author: Josh Elser <el...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Josh Elser <el...@apache.org>
---
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 38 ++++++++++++++++------
1 file changed, 28 insertions(+), 10 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index a32040c..efc7f89 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -342,6 +342,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private User user;
private InetAddress remoteAddress;
+ private boolean saslWrapDone;
private long responseCellSize = 0;
private long responseBlockSize = 0;
@@ -369,6 +370,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.tinfo = tinfo;
this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
this.remoteAddress = remoteAddress;
+ this.saslWrapDone = false;
this.retryImmediatelySupported =
connection == null? null: connection.retryImmediatelySupported;
this.timeout = timeout;
@@ -478,10 +480,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
byte[] b = createHeaderAndMessageBytes(result, header);
bc = new BufferChain(ByteBuffer.wrap(b), this.cellBlock);
-
- if (connection.useWrap) {
- bc = wrapWithSasl(bc);
- }
} catch (IOException e) {
LOG.warn("Exception while creating response " + e);
}
@@ -526,6 +524,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return b;
}
+ private synchronized void wrapWithSasl() throws IOException {
+ // do it only once per call
+ if (saslWrapDone) {
+ return;
+ }
+ response = wrapWithSasl(response);
+ saslWrapDone = true;
+ }
+
+ /**
+ * Do not call directly, invoke via {@link #wrapWithSasl()}.
+ */
private BufferChain wrapWithSasl(BufferChain bc)
throws IOException {
if (!this.connection.useSasl) return bc;
@@ -533,11 +543,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// THIS IS A BIG UGLY COPY.
byte [] responseBytes = bc.getBytes();
byte [] token;
- // synchronization may be needed since there can be multiple Handler
- // threads using saslServer to wrap responses.
- synchronized (connection.saslServer) {
- token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
- }
+
+ // Previously, synchronization was needed since there could be multiple Handler
+ // threads using saslServer to wrap responses. However, now we wrap the response
+ // inside of the Responder thread to avoid sending back mis-ordered SASL messages.
+ token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
if (LOG.isTraceEnabled()) {
LOG.trace("Adding saslServer wrapped token of size " + token.length
+ " as call response.");
@@ -1188,6 +1198,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private boolean processResponse(final Call call) throws IOException {
boolean error = true;
try {
+ // Wrap the message "late" in SASL to ensure that the sequence number matches the order of
+ // responses we write out.
+ if (call.connection.useWrap) {
+ call.wrapWithSasl();
+ }
// Send as much data as we can in the non-blocking fashion
long numBytes = channelWrite(call.connection.channel, call.response);
if (numBytes < 0) {
@@ -1220,6 +1235,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
private boolean processAllResponses(final Connection connection) throws IOException {
// We want only one writer on the channel for a connection at a time.
+ boolean isEmpty = false;
connection.responseWriteLock.lock();
try {
for (int i = 0; i < 20; i++) {
@@ -1233,11 +1249,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return false;
}
}
+ // Check that state within the lock to be consistent
+ isEmpty = connection.responseQueue.isEmpty();
} finally {
connection.responseWriteLock.unlock();
}
- return connection.responseQueue.isEmpty();
+ return isEmpty;
}
//