You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/09/21 14:38:18 UTC
hbase git commit: HBASE-16654 Better handle channelInactive and close
for netty rpc client
Repository: hbase
Updated Branches:
refs/heads/master c67983ebf -> 5568929dd
HBASE-16654 Better handle channelInactive and close for netty rpc client
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5568929d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5568929d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5568929d
Branch: refs/heads/master
Commit: 5568929dd2bfc20c1aa15d37d318459888cbd32a
Parents: c67983e
Author: zhangduo <zh...@apache.org>
Authored: Mon Sep 19 20:52:46 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Sep 21 22:36:47 2016 +0800
----------------------------------------------------------------------
.../hbase/ipc/BufferCallBeforeInitHandler.java | 6 +-
.../hadoop/hbase/ipc/NettyRpcConnection.java | 6 +-
.../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 9 +-
.../hbase/security/AsyncHBaseSaslRpcClient.java | 58 --------
.../AsyncHBaseSaslRpcClientHandler.java | 135 ------------------
.../hbase/security/NettyHBaseSaslRpcClient.java | 58 ++++++++
.../NettyHBaseSaslRpcClientHandler.java | 142 +++++++++++++++++++
.../hbase/security/SaslUnwrapHandler.java | 1 +
.../hadoop/hbase/security/SaslWrapHandler.java | 46 +++---
9 files changed, 241 insertions(+), 220 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
index 573ddd5..c628c31 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BufferCallBeforeInitHandler.java
@@ -62,13 +62,16 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
private static final BufferCallEvent SUCCESS_EVENT = new BufferCallEvent(BufferCallAction.FLUSH,
null);
- private final Map<Integer, Call> id2Call = new HashMap<Integer, Call>();
+ private final Map<Integer, Call> id2Call = new HashMap<>();
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof Call) {
Call call = (Call) msg;
id2Call.put(call.id, call);
+ // The call is already in track so here we set the write operation as success.
+ // We will fail the call directly if we can not write it out.
+ promise.trySuccess();
} else {
ctx.write(msg, promise);
}
@@ -99,5 +102,4 @@ class BufferCallBeforeInitHandler extends ChannelDuplexHandler {
ctx.fireUserEventTriggered(evt);
}
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
index 559b7f9..8a85580 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.BufferCallBeforeInitHandler.BufferCallEvent;
import org.apache.hadoop.hbase.ipc.HBaseRpcController.CancellationCallback;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.security.AsyncHBaseSaslRpcClientHandler;
+import org.apache.hadoop.hbase.security.NettyHBaseSaslRpcClientHandler;
import org.apache.hadoop.hbase.security.SaslChallengeDecoder;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.util.Threads;
@@ -190,7 +190,7 @@ class NettyRpcConnection extends RpcConnection {
Promise<Boolean> saslPromise = ch.eventLoop().newPromise();
ChannelHandler saslHandler;
try {
- saslHandler = new AsyncHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
+ saslHandler = new NettyHBaseSaslRpcClientHandler(saslPromise, ticket, authMethod, token,
serverPrincipal, rpcClient.fallbackAllowed, this.rpcClient.conf.get(
"hbase.rpc.protection", QualityOfProtection.AUTHENTICATION.name().toLowerCase()));
} catch (IOException e) {
@@ -205,7 +205,7 @@ class NettyRpcConnection extends RpcConnection {
if (future.isSuccess()) {
ChannelPipeline p = ch.pipeline();
p.remove(SaslChallengeDecoder.class);
- p.remove(AsyncHBaseSaslRpcClientHandler.class);
+ p.remove(NettyHBaseSaslRpcClientHandler.class);
established(ch);
} else {
final Throwable error = future.cause();
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index 1cd89d8..5faaede 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -204,13 +204,18 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- cleanupCalls(ctx, new IOException("Connection closed"));
+ if (!id2Call.isEmpty()) {
+ cleanupCalls(ctx, new IOException("Connection closed"));
+ }
conn.shutdown();
+ ctx.fireChannelInactive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- cleanupCalls(ctx, IPCUtil.toIOE(cause));
+ if (!id2Call.isEmpty()) {
+ cleanupCalls(ctx, IPCUtil.toIOE(cause));
+ }
conn.shutdown();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java
deleted file mode 100644
index df703dc..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClient.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-
-import java.io.IOException;
-
-import javax.security.sasl.Sasl;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-/**
- * Implement SASL logic for async rpc client.
- */
-@InterfaceAudience.Private
-public class AsyncHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
- private static final Log LOG = LogFactory.getLog(AsyncHBaseSaslRpcClient.class);
-
- public AsyncHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
- String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException {
- super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
- }
-
- public void setupSaslHandler(ChannelPipeline p) {
- String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
- if (LOG.isDebugEnabled()) {
- LOG.debug("SASL client context established. Negotiated QoP: " + qop);
- }
- if (qop == null || "auth".equalsIgnoreCase(qop)) {
- return;
- }
- // add wrap and unwrap handlers to pipeline.
- p.addFirst(new SaslWrapHandler(saslClient),
- new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
- new SaslUnwrapHandler(saslClient));
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java
deleted file mode 100644
index bccfa30..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/AsyncHBaseSaslRpcClientHandler.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.util.concurrent.Promise;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-
-/**
- * Implement SASL logic for async rpc client.
- */
-@InterfaceAudience.Private
-public class AsyncHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
-
- private static final Log LOG = LogFactory.getLog(AsyncHBaseSaslRpcClientHandler.class);
-
- private final Promise<Boolean> saslPromise;
-
- private final UserGroupInformation ugi;
-
- private final AsyncHBaseSaslRpcClient saslRpcClient;
-
- /**
- * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to
- * simple.
- */
- public AsyncHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
- AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal,
- boolean fallbackAllowed, String rpcProtection) throws IOException {
- this.saslPromise = saslPromise;
- this.ugi = ugi;
- this.saslRpcClient = new AsyncHBaseSaslRpcClient(method, token, serverPrincipal,
- fallbackAllowed, rpcProtection);
- }
-
- private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Will send token of size " + response.length + " from initSASLContext.");
- }
- ctx.writeAndFlush(
- ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response));
- }
-
- private void tryComplete(ChannelHandlerContext ctx) {
- if (!saslRpcClient.isComplete()) {
- return;
- }
- saslRpcClient.setupSaslHandler(ctx.pipeline());
- saslPromise.setSuccess(true);
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) {
- try {
- byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
-
- @Override
- public byte[] run() throws Exception {
- return saslRpcClient.getInitialResponse();
- }
- });
- if (initialResponse != null) {
- writeResponse(ctx, initialResponse);
- }
- tryComplete(ctx);
- } catch (Exception e) {
- // the exception thrown by handlerAdded will not be passed to the exceptionCaught below
- // because netty will remove a handler if handlerAdded throws an exception.
- exceptionCaught(ctx, e);
- }
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
- int len = msg.readInt();
- if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
- saslRpcClient.dispose();
- if (saslRpcClient.fallbackAllowed) {
- saslPromise.setSuccess(false);
- } else {
- saslPromise.setFailure(new FallbackDisallowedException());
- }
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Will read input token of size " + len + " for processing by initSASLContext");
- }
- final byte[] challenge = new byte[len];
- msg.readBytes(challenge);
- byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
-
- @Override
- public byte[] run() throws Exception {
- return saslRpcClient.evaluateChallenge(challenge);
- }
- });
- if (response != null) {
- writeResponse(ctx, response);
- }
- tryComplete(ctx);
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- saslRpcClient.dispose();
- saslPromise.setFailure(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
new file mode 100644
index 0000000..f624608
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClient.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import java.io.IOException;
+
+import javax.security.sasl.Sasl;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Implement SASL logic for netty rpc client.
+ */
+@InterfaceAudience.Private
+public class NettyHBaseSaslRpcClient extends AbstractHBaseSaslRpcClient {
+ private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClient.class);
+
+ public NettyHBaseSaslRpcClient(AuthMethod method, Token<? extends TokenIdentifier> token,
+ String serverPrincipal, boolean fallbackAllowed, String rpcProtection) throws IOException {
+ super(method, token, serverPrincipal, fallbackAllowed, rpcProtection);
+ }
+
+ public void setupSaslHandler(ChannelPipeline p) {
+ String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client context established. Negotiated QoP: " + qop);
+ }
+ if (qop == null || "auth".equalsIgnoreCase(qop)) {
+ return;
+ }
+ // add wrap and unwrap handlers to pipeline.
+ p.addFirst(new SaslWrapHandler(saslClient),
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4),
+ new SaslUnwrapHandler(saslClient));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
new file mode 100644
index 0000000..50609b4
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/NettyHBaseSaslRpcClientHandler.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.concurrent.Promise;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.FallbackDisallowedException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * Implement SASL logic for netty rpc client.
+ */
+@InterfaceAudience.Private
+public class NettyHBaseSaslRpcClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ private static final Log LOG = LogFactory.getLog(NettyHBaseSaslRpcClientHandler.class);
+
+ private final Promise<Boolean> saslPromise;
+
+ private final UserGroupInformation ugi;
+
+ private final NettyHBaseSaslRpcClient saslRpcClient;
+
+ /**
+ * @param saslPromise {@code true} if success, {@code false} if server tells us to fallback to
+ * simple.
+ */
+ public NettyHBaseSaslRpcClientHandler(Promise<Boolean> saslPromise, UserGroupInformation ugi,
+ AuthMethod method, Token<? extends TokenIdentifier> token, String serverPrincipal,
+ boolean fallbackAllowed, String rpcProtection) throws IOException {
+ this.saslPromise = saslPromise;
+ this.ugi = ugi;
+ this.saslRpcClient = new NettyHBaseSaslRpcClient(method, token, serverPrincipal,
+ fallbackAllowed, rpcProtection);
+ }
+
+ private void writeResponse(ChannelHandlerContext ctx, byte[] response) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Will send token of size " + response.length + " from initSASLContext.");
+ }
+ ctx.writeAndFlush(
+ ctx.alloc().buffer(4 + response.length).writeInt(response.length).writeBytes(response));
+ }
+
+ private void tryComplete(ChannelHandlerContext ctx) {
+ if (!saslRpcClient.isComplete()) {
+ return;
+ }
+ saslRpcClient.setupSaslHandler(ctx.pipeline());
+ saslPromise.setSuccess(true);
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) {
+ try {
+ byte[] initialResponse = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
+
+ @Override
+ public byte[] run() throws Exception {
+ return saslRpcClient.getInitialResponse();
+ }
+ });
+ if (initialResponse != null) {
+ writeResponse(ctx, initialResponse);
+ }
+ tryComplete(ctx);
+ } catch (Exception e) {
+ // the exception thrown by handlerAdded will not be passed to the exceptionCaught below
+ // because netty will remove a handler if handlerAdded throws an exception.
+ exceptionCaught(ctx, e);
+ }
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+ int len = msg.readInt();
+ if (len == SaslUtil.SWITCH_TO_SIMPLE_AUTH) {
+ saslRpcClient.dispose();
+ if (saslRpcClient.fallbackAllowed) {
+ saslPromise.trySuccess(false);
+ } else {
+ saslPromise.tryFailure(new FallbackDisallowedException());
+ }
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Will read input token of size " + len + " for processing by initSASLContext");
+ }
+ final byte[] challenge = new byte[len];
+ msg.readBytes(challenge);
+ byte[] response = ugi.doAs(new PrivilegedExceptionAction<byte[]>() {
+
+ @Override
+ public byte[] run() throws Exception {
+ return saslRpcClient.evaluateChallenge(challenge);
+ }
+ });
+ if (response != null) {
+ writeResponse(ctx, response);
+ }
+ tryComplete(ctx);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ saslRpcClient.dispose();
+ saslPromise.tryFailure(new IOException("Connection closed"));
+ ctx.fireChannelInactive();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ saslRpcClient.dispose();
+ saslPromise.tryFailure(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
index c2faf91..e631478 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslUnwrapHandler.java
@@ -42,6 +42,7 @@ public class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SaslUtil.safeDispose(saslClient);
+ ctx.fireChannelInactive();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/5568929d/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
index fefb4f8..14ecf2e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SaslWrapHandler.java
@@ -26,6 +26,8 @@ import io.netty.channel.CoalescingBufferQueue;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.PromiseCombiner;
+import java.io.IOException;
+
import javax.security.sasl.SaslClient;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -40,6 +42,10 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
private CoalescingBufferQueue queue;
+ public SaslWrapHandler(SaslClient saslClient) {
+ this.saslClient = saslClient;
+ }
+
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
queue = new CoalescingBufferQueue(ctx.channel());
@@ -55,29 +61,26 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
}
}
- public SaslWrapHandler(SaslClient saslClient) {
- this.saslClient = saslClient;
- }
-
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
+ if (queue.isEmpty()) {
+ return;
+ }
ByteBuf buf = null;
try {
- if (!queue.isEmpty()) {
- ChannelPromise promise = ctx.newPromise();
- int readableBytes = queue.readableBytes();
- buf = queue.remove(readableBytes, promise);
- byte[] bytes = new byte[readableBytes];
- buf.readBytes(bytes);
- byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
- ChannelPromise lenPromise = ctx.newPromise();
- ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
- ChannelPromise contentPromise = ctx.newPromise();
- ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
- PromiseCombiner combiner = new PromiseCombiner();
- combiner.addAll(lenPromise, contentPromise);
- combiner.finish(promise);
- }
+ ChannelPromise promise = ctx.newPromise();
+ int readableBytes = queue.readableBytes();
+ buf = queue.remove(readableBytes, promise);
+ byte[] bytes = new byte[readableBytes];
+ buf.readBytes(bytes);
+ byte[] wrapperBytes = saslClient.wrap(bytes, 0, bytes.length);
+ ChannelPromise lenPromise = ctx.newPromise();
+ ctx.write(ctx.alloc().buffer(4).writeInt(wrapperBytes.length), lenPromise);
+ ChannelPromise contentPromise = ctx.newPromise();
+ ctx.write(Unpooled.wrappedBuffer(wrapperBytes), contentPromise);
+ PromiseCombiner combiner = new PromiseCombiner();
+ combiner.addAll(lenPromise, contentPromise);
+ combiner.finish(promise);
ctx.flush();
} finally {
if (buf != null) {
@@ -88,6 +91,9 @@ public class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
- queue.releaseAndFailAll(new Throwable("Closed"));
+ if (!queue.isEmpty()) {
+ queue.releaseAndFailAll(new IOException("Connection closed"));
+ }
+ ctx.close(promise);
}
}