You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/05/15 10:08:18 UTC
[3/3] hbase git commit: HBASE-18012 Move RpcServer.Connection to a
separated file
HBASE-18012 Move RpcServer.Connection to a separated file
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/341223d8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/341223d8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/341223d8
Branch: refs/heads/master
Commit: 341223d86c567e2124802b0a19e1ba7bdc560cad
Parents: 5cdaca5
Author: zhangduo <zh...@apache.org>
Authored: Wed May 10 11:05:38 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Mon May 15 18:07:38 2017 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/nio/SingleByteBuff.java | 4 +-
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 4 +-
.../apache/hadoop/hbase/ipc/NettyRpcServer.java | 186 +---
.../hadoop/hbase/ipc/NettyServerCall.java | 13 +-
.../hbase/ipc/NettyServerRpcConnection.java | 206 +++++
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 820 +-----------------
.../org/apache/hadoop/hbase/ipc/ServerCall.java | 7 +-
.../hadoop/hbase/ipc/ServerRpcConnection.java | 852 +++++++++++++++++++
.../hadoop/hbase/ipc/SimpleRpcServer.java | 717 +---------------
.../hbase/ipc/SimpleRpcServerResponder.java | 316 +++++++
.../hadoop/hbase/ipc/SimpleServerCall.java | 18 +-
.../hbase/ipc/SimpleServerRpcConnection.java | 428 ++++++++++
.../hbase/security/HBaseSaslRpcServer.java | 24 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 11 +-
14 files changed, 1869 insertions(+), 1737 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
index 9f6b7b5..6ed90ad 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.nio;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
@@ -27,8 +29,6 @@ import org.apache.hadoop.hbase.util.ObjectIntPair;
import org.apache.hadoop.hbase.util.UnsafeAccess;
import org.apache.hadoop.hbase.util.UnsafeAvailChecker;
-import com.google.common.annotations.VisibleForTesting;
-
import sun.nio.ch.DirectBuffer;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index f16fc50..f476b11 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -75,8 +75,8 @@ public class CallRunner {
* @deprecated As of release 2.0, this will be removed in HBase 3.0
*/
@Deprecated
- public ServerCall getCall() {
- return (ServerCall) call;
+ public ServerCall<?> getCall() {
+ return (ServerCall<?>) call;
}
public void setStatus(MonitoredRPCHandler status) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index c18b894..4a4ddba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.ipc;
import io.netty.bootstrap.ServerBootstrap;
@@ -46,10 +45,7 @@ import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -57,31 +53,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
-import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.nio.SingleByteBuff;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
-import org.apache.hadoop.hbase.security.SaslStatus;
-import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVM;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
-import org.apache.htrace.TraceInfo;
/**
* An RPC server with Netty4 implementation.
- *
*/
+@InterfaceAudience.Private
public class NettyRpcServer extends RpcServer {
public static final Log LOG = LogFactory.getLog(NettyRpcServer.class);
@@ -187,166 +173,6 @@ public class NettyRpcServer extends RpcServer {
return ((InetSocketAddress) serverChannel.localAddress());
}
- public class NettyConnection extends RpcServer.Connection {
-
- protected Channel channel;
-
- NettyConnection(Channel channel) {
- super();
- this.channel = channel;
- InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
- this.addr = inetSocketAddress.getAddress();
- if (addr == null) {
- this.hostAddress = "*Unknown*";
- } else {
- this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
- }
- this.remotePort = inetSocketAddress.getPort();
- this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
- null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
- this.setConnectionHeaderResponseCall =
- new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this,
- 0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
- this.authFailedCall =
- new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0,
- null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
- }
-
- void readPreamble(ByteBuf buffer) throws IOException {
- byte[] rpcHead =
- { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() };
- if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
- doBadPreambleHandling("Expected HEADER="
- + Bytes.toStringBinary(HConstants.RPC_HEADER) + " but received HEADER="
- + Bytes.toStringBinary(rpcHead) + " from " + toString());
- return;
- }
- // Now read the next two bytes, the version and the auth to use.
- int version = buffer.readByte();
- byte authbyte = buffer.readByte();
- this.authMethod = AuthMethod.valueOf(authbyte);
- if (version != CURRENT_VERSION) {
- String msg = getFatalConnectionString(version, authbyte);
- doBadPreambleHandling(msg, new WrongVersionException(msg));
- return;
- }
- if (authMethod == null) {
- String msg = getFatalConnectionString(version, authbyte);
- doBadPreambleHandling(msg, new BadAuthException(msg));
- return;
- }
- if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
- if (allowFallbackToSimpleAuth) {
- metrics.authenticationFallback();
- authenticatedWithFallback = true;
- } else {
- AccessDeniedException ae = new AccessDeniedException(
- "Authentication is required");
- setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
- ((NettyServerCall) authFailedCall)
- .sendResponseIfReady(ChannelFutureListener.CLOSE);
- return;
- }
- }
- if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
- doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
- null);
- authMethod = AuthMethod.SIMPLE;
- // client has already sent the initial Sasl message and we
- // should ignore it. Both client and server should fall back
- // to simple auth from now on.
- skipInitialSaslHandshake = true;
- }
- if (authMethod != AuthMethod.SIMPLE) {
- useSasl = true;
- }
- connectionPreambleRead = true;
- }
-
- private void doBadPreambleHandling(final String msg) throws IOException {
- doBadPreambleHandling(msg, new FatalConnectionException(msg));
- }
-
- private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
- LOG.warn(msg);
- NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1,
- null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
- setupResponse(null, fakeCall, e, msg);
- // closes out the connection.
- fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
- }
-
- void process(final ByteBuf buf) throws IOException, InterruptedException {
- if (connectionHeaderRead) {
- this.callCleanup = new RpcServer.CallCleanup() {
- @Override
- public void run() {
- buf.release();
- }
- };
- process(new SingleByteBuff(buf.nioBuffer()));
- } else {
- byte[] data = new byte[buf.readableBytes()];
- buf.readBytes(data, 0, data.length);
- ByteBuffer connectionHeader = ByteBuffer.wrap(data);
- buf.release();
- process(connectionHeader);
- }
- }
-
- void process(ByteBuffer buf) throws IOException, InterruptedException {
- process(new SingleByteBuff(buf));
- }
-
- void process(ByteBuff buf) throws IOException, InterruptedException {
- try {
- if (skipInitialSaslHandshake) {
- skipInitialSaslHandshake = false;
- if (callCleanup != null) {
- callCleanup.run();
- }
- return;
- }
-
- if (useSasl) {
- saslReadAndProcess(buf);
- } else {
- processOneRpc(buf);
- }
- } catch (Exception e) {
- if (callCleanup != null) {
- callCleanup.run();
- }
- throw e;
- } finally {
- this.callCleanup = null;
- }
- }
-
- @Override
- public synchronized void close() {
- disposeSasl();
- channel.close();
- callCleanup = null;
- }
-
- @Override
- public boolean isConnectionOpen() {
- return channel.isOpen();
- }
-
- @Override
- public ServerCall createCall(int id, final BlockingService service,
- final MethodDescriptor md, RequestHeader header, Message param,
- CellScanner cellScanner, RpcServer.Connection connection, long size,
- TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
- CallCleanup reqCleanup) {
- return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size,
- tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
- reqCleanup);
- }
- }
-
private class Initializer extends ChannelInitializer<SocketChannel> {
final int maxRequestSize;
@@ -368,7 +194,7 @@ public class NettyRpcServer extends RpcServer {
}
private class ConnectionHeaderHandler extends ByteToMessageDecoder {
- private NettyConnection connection;
+ private NettyServerRpcConnection connection;
ConnectionHeaderHandler() {
}
@@ -379,7 +205,7 @@ public class NettyRpcServer extends RpcServer {
if (byteBuf.readableBytes() < 6) {
return;
}
- connection = new NettyConnection(ctx.channel());
+ connection = new NettyServerRpcConnection(NettyRpcServer.this, ctx.channel());
connection.readPreamble(byteBuf);
((MessageDecoder) ctx.pipeline().get("decoder"))
.setConnection(connection);
@@ -390,9 +216,9 @@ public class NettyRpcServer extends RpcServer {
private class MessageDecoder extends ChannelInboundHandlerAdapter {
- private NettyConnection connection;
+ private NettyServerRpcConnection connection;
- void setConnection(NettyConnection connection) {
+ void setConnection(NettyServerRpcConnection connection) {
this.connection = connection;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
index a3f23dd..3cb9a5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -25,7 +25,6 @@ import java.net.InetAddress;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.ipc.NettyRpcServer.NettyConnection;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
@@ -38,30 +37,26 @@ import org.apache.htrace.TraceInfo;
* result.
*/
@InterfaceAudience.Private
-class NettyServerCall extends ServerCall {
+class NettyServerCall extends ServerCall<NettyServerRpcConnection> {
NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
- Message param, CellScanner cellScanner, RpcServer.Connection connection, long size,
+ Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size,
TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout,
ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
}
- NettyConnection getConnection() {
- return (NettyConnection) this.connection;
- }
-
/**
* If we have a response, and delay is not set, then respond immediately. Otherwise, do not
* respond to client. This is called by the RPC code in the context of the Handler thread.
*/
@Override
public synchronized void sendResponseIfReady() throws IOException {
- getConnection().channel.writeAndFlush(this);
+ connection.channel.writeAndFlush(this);
}
public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
- getConnection().channel.writeAndFlush(this).addListener(listener);
+ connection.channel.writeAndFlush(this).addListener(listener);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
new file mode 100644
index 0000000..7985295
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.nio.SingleByteBuff;
+import org.apache.hadoop.hbase.security.AccessDeniedException;
+import org.apache.hadoop.hbase.security.AuthMethod;
+import org.apache.hadoop.hbase.security.SaslStatus;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * RpcConnection implementation for netty rpc server.
+ */
+@InterfaceAudience.Private
+class NettyServerRpcConnection extends ServerRpcConnection {
+
+ final Channel channel;
+
+ NettyServerRpcConnection(NettyRpcServer rpcServer, Channel channel) {
+ super(rpcServer);
+ this.channel = channel;
+ InetSocketAddress inetSocketAddress = ((InetSocketAddress) channel.remoteAddress());
+ this.addr = inetSocketAddress.getAddress();
+ if (addr == null) {
+ this.hostAddress = "*Unknown*";
+ } else {
+ this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
+ }
+ this.remotePort = inetSocketAddress.getPort();
+ this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
+ null, System.currentTimeMillis(), 0, rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
+ this.setConnectionHeaderResponseCall = new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
+ null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
+ rpcServer.reservoir, rpcServer.cellBlockBuilder, null);
+ this.authFailedCall = new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null,
+ null, this, 0, null, null, System.currentTimeMillis(), 0, rpcServer.reservoir,
+ rpcServer.cellBlockBuilder, null);
+ }
+
+ void readPreamble(ByteBuf buffer) throws IOException {
+ byte[] rpcHead = { buffer.readByte(), buffer.readByte(), buffer.readByte(), buffer.readByte() };
+ if (!Arrays.equals(HConstants.RPC_HEADER, rpcHead)) {
+ doBadPreambleHandling("Expected HEADER=" + Bytes.toStringBinary(HConstants.RPC_HEADER) +
+ " but received HEADER=" + Bytes.toStringBinary(rpcHead) + " from " + toString());
+ return;
+ }
+ // Now read the next two bytes, the version and the auth to use.
+ int version = buffer.readByte();
+ byte authbyte = buffer.readByte();
+ this.authMethod = AuthMethod.valueOf(authbyte);
+ if (version != NettyRpcServer.CURRENT_VERSION) {
+ String msg = getFatalConnectionString(version, authbyte);
+ doBadPreambleHandling(msg, new WrongVersionException(msg));
+ return;
+ }
+ if (authMethod == null) {
+ String msg = getFatalConnectionString(version, authbyte);
+ doBadPreambleHandling(msg, new BadAuthException(msg));
+ return;
+ }
+ if (this.rpcServer.isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
+ if (this.rpcServer.allowFallbackToSimpleAuth) {
+ this.rpcServer.metrics.authenticationFallback();
+ authenticatedWithFallback = true;
+ } else {
+ AccessDeniedException ae = new AccessDeniedException("Authentication is required");
+ this.rpcServer.setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
+ ((NettyServerCall) authFailedCall).sendResponseIfReady(ChannelFutureListener.CLOSE);
+ return;
+ }
+ }
+ if (!this.rpcServer.isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
+ doRawSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslUtil.SWITCH_TO_SIMPLE_AUTH), null,
+ null);
+ authMethod = AuthMethod.SIMPLE;
+ // client has already sent the initial Sasl message and we
+ // should ignore it. Both client and server should fall back
+ // to simple auth from now on.
+ skipInitialSaslHandshake = true;
+ }
+ if (authMethod != AuthMethod.SIMPLE) {
+ useSasl = true;
+ }
+ connectionPreambleRead = true;
+ }
+
+ private void doBadPreambleHandling(final String msg) throws IOException {
+ doBadPreambleHandling(msg, new FatalConnectionException(msg));
+ }
+
+ private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
+ NettyRpcServer.LOG.warn(msg);
+ NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1, null,
+ null, System.currentTimeMillis(), 0, this.rpcServer.reservoir,
+ this.rpcServer.cellBlockBuilder, null);
+ this.rpcServer.setupResponse(null, fakeCall, e, msg);
+ // closes out the connection.
+ fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
+ }
+
+ void process(final ByteBuf buf) throws IOException, InterruptedException {
+ if (connectionHeaderRead) {
+ this.callCleanup = new RpcServer.CallCleanup() {
+ @Override
+ public void run() {
+ buf.release();
+ }
+ };
+ process(new SingleByteBuff(buf.nioBuffer()));
+ } else {
+ byte[] data = new byte[buf.readableBytes()];
+ buf.readBytes(data, 0, data.length);
+ ByteBuffer connectionHeader = ByteBuffer.wrap(data);
+ buf.release();
+ process(connectionHeader);
+ }
+ }
+
+ void process(ByteBuffer buf) throws IOException, InterruptedException {
+ process(new SingleByteBuff(buf));
+ }
+
+ void process(ByteBuff buf) throws IOException, InterruptedException {
+ try {
+ if (skipInitialSaslHandshake) {
+ skipInitialSaslHandshake = false;
+ if (callCleanup != null) {
+ callCleanup.run();
+ }
+ return;
+ }
+
+ if (useSasl) {
+ saslReadAndProcess(buf);
+ } else {
+ processOneRpc(buf);
+ }
+ } catch (Exception e) {
+ if (callCleanup != null) {
+ callCleanup.run();
+ }
+ throw e;
+ } finally {
+ this.callCleanup = null;
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ disposeSasl();
+ channel.close();
+ callCleanup = null;
+ }
+
+ @Override
+ public boolean isConnectionOpen() {
+ return channel.isOpen();
+ }
+
+ @Override
+ public NettyServerCall createCall(int id, final BlockingService service,
+ final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner,
+ long size, TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
+ CallCleanup reqCleanup) {
+ return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, tinfo,
+ remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir,
+ this.rpcServer.cellBlockBuilder, reqCleanup);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index bbc329c..d68a05e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -20,34 +20,22 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
-import java.io.ByteArrayInputStream;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
-import java.security.GeneralSecurityException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.atomic.LongAdder;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-
-import org.apache.commons.crypto.cipher.CryptoCipherFactory;
-import org.apache.commons.crypto.random.CryptoRandom;
-import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -59,65 +47,36 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.VersionInfoUtil;
-import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
-import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.security.AccessDeniedException;
-import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslDigestCallbackHandler;
-import org.apache.hadoop.hbase.security.HBaseSaslRpcServer.SaslGssCallbackHandler;
-import org.apache.hadoop.hbase.security.SaslStatus;
-import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
-import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.htrace.TraceInfo;
import org.codehaus.jackson.map.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* An RPC server that hosts protobuf described Services.
*
@@ -262,739 +221,6 @@ public abstract class RpcServer implements RpcServerInterface,
void run();
}
- /** Reads calls from a connection and queues them for handling. */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(
- value="VO_VOLATILE_INCREMENT",
- justification="False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
- public abstract class Connection implements Closeable {
- // If initial preamble with version and magic has been read or not.
- protected boolean connectionPreambleRead = false;
- // If the connection header has been read or not.
- protected boolean connectionHeaderRead = false;
-
- protected CallCleanup callCleanup;
-
- // Cache the remote host & port info so that even if the socket is
- // disconnected, we can say where it used to connect to.
- protected String hostAddress;
- protected int remotePort;
- protected InetAddress addr;
- protected ConnectionHeader connectionHeader;
-
- /**
- * Codec the client asked use.
- */
- protected Codec codec;
- /**
- * Compression codec the client asked us use.
- */
- protected CompressionCodec compressionCodec;
- protected BlockingService service;
-
- protected AuthMethod authMethod;
- protected boolean saslContextEstablished;
- protected boolean skipInitialSaslHandshake;
- private ByteBuffer unwrappedData;
- // When is this set? FindBugs wants to know! Says NP
- private ByteBuffer unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
- protected boolean useSasl;
- protected SaslServer saslServer;
- protected CryptoAES cryptoAES;
- protected boolean useWrap = false;
- protected boolean useCryptoAesWrap = false;
- // Fake 'call' for failed authorization response
- protected static final int AUTHORIZATION_FAILED_CALLID = -1;
- protected ServerCall authFailedCall;
- protected ByteArrayOutputStream authFailedResponse =
- new ByteArrayOutputStream();
- // Fake 'call' for SASL context setup
- protected static final int SASL_CALLID = -33;
- protected ServerCall saslCall;
- // Fake 'call' for connection header response
- protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
- protected ServerCall setConnectionHeaderResponseCall;
-
- // was authentication allowed with a fallback to simple auth
- protected boolean authenticatedWithFallback;
-
- protected boolean retryImmediatelySupported = false;
-
- public UserGroupInformation attemptingUser = null; // user name before auth
- protected User user = null;
- protected UserGroupInformation ugi = null;
-
- public Connection() {
- this.callCleanup = null;
- }
-
- @Override
- public String toString() {
- return getHostAddress() + ":" + remotePort;
- }
-
- public String getHostAddress() {
- return hostAddress;
- }
-
- public InetAddress getHostInetAddress() {
- return addr;
- }
-
- public int getRemotePort() {
- return remotePort;
- }
-
- public VersionInfo getVersionInfo() {
- if (connectionHeader.hasVersionInfo()) {
- return connectionHeader.getVersionInfo();
- }
- return null;
- }
-
- protected String getFatalConnectionString(final int version, final byte authByte) {
- return "serverVersion=" + CURRENT_VERSION +
- ", clientVersion=" + version + ", authMethod=" + authByte +
- ", authSupported=" + (authMethod != null) + " from " + toString();
- }
-
- protected UserGroupInformation getAuthorizedUgi(String authorizedId)
- throws IOException {
- UserGroupInformation authorizedUgi;
- if (authMethod == AuthMethod.DIGEST) {
- TokenIdentifier tokenId = HBaseSaslRpcServer.getIdentifier(authorizedId,
- secretManager);
- authorizedUgi = tokenId.getUser();
- if (authorizedUgi == null) {
- throw new AccessDeniedException(
- "Can't retrieve username from tokenIdentifier.");
- }
- authorizedUgi.addTokenIdentifier(tokenId);
- } else {
- authorizedUgi = UserGroupInformation.createRemoteUser(authorizedId);
- }
- authorizedUgi.setAuthenticationMethod(authMethod.authenticationMethod.getAuthMethod());
- return authorizedUgi;
- }
-
- /**
- * Set up cell block codecs
- * @throws FatalConnectionException
- */
- protected void setupCellBlockCodecs(final ConnectionHeader header)
- throws FatalConnectionException {
- // TODO: Plug in other supported decoders.
- if (!header.hasCellBlockCodecClass()) return;
- String className = header.getCellBlockCodecClass();
- if (className == null || className.length() == 0) return;
- try {
- this.codec = (Codec)Class.forName(className).newInstance();
- } catch (Exception e) {
- throw new UnsupportedCellCodecException(className, e);
- }
- if (!header.hasCellBlockCompressorClass()) return;
- className = header.getCellBlockCompressorClass();
- try {
- this.compressionCodec = (CompressionCodec)Class.forName(className).newInstance();
- } catch (Exception e) {
- throw new UnsupportedCompressionCodecException(className, e);
- }
- }
-
- /**
- * Set up cipher for rpc encryption with Apache Commons Crypto
- *
- * @throws FatalConnectionException
- */
- protected void setupCryptoCipher(final ConnectionHeader header,
- RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
- throws FatalConnectionException {
- // If simple auth, return
- if (saslServer == null) return;
- // check if rpc encryption with Crypto AES
- String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
- boolean isEncryption = SaslUtil.QualityOfProtection.PRIVACY
- .getSaslQop().equalsIgnoreCase(qop);
- boolean isCryptoAesEncryption = isEncryption && conf.getBoolean(
- "hbase.rpc.crypto.encryption.aes.enabled", false);
- if (!isCryptoAesEncryption) return;
- if (!header.hasRpcCryptoCipherTransformation()) return;
- String transformation = header.getRpcCryptoCipherTransformation();
- if (transformation == null || transformation.length() == 0) return;
- // Negotiates AES based on complete saslServer.
- // The Crypto metadata need to be encrypted and send to client.
- Properties properties = new Properties();
- // the property for SecureRandomFactory
- properties.setProperty(CryptoRandomFactory.CLASSES_KEY,
- conf.get("hbase.crypto.sasl.encryption.aes.crypto.random",
- "org.apache.commons.crypto.random.JavaCryptoRandom"));
- // the property for cipher class
- properties.setProperty(CryptoCipherFactory.CLASSES_KEY,
- conf.get("hbase.rpc.crypto.encryption.aes.cipher.class",
- "org.apache.commons.crypto.cipher.JceCipher"));
-
- int cipherKeyBits = conf.getInt(
- "hbase.rpc.crypto.encryption.aes.cipher.keySizeBits", 128);
- // generate key and iv
- if (cipherKeyBits % 8 != 0) {
- throw new IllegalArgumentException("The AES cipher key size in bits" +
- " should be a multiple of byte");
- }
- int len = cipherKeyBits / 8;
- byte[] inKey = new byte[len];
- byte[] outKey = new byte[len];
- byte[] inIv = new byte[len];
- byte[] outIv = new byte[len];
-
- try {
- // generate the cipher meta data with SecureRandom
- CryptoRandom secureRandom = CryptoRandomFactory.getCryptoRandom(properties);
- secureRandom.nextBytes(inKey);
- secureRandom.nextBytes(outKey);
- secureRandom.nextBytes(inIv);
- secureRandom.nextBytes(outIv);
-
- // create CryptoAES for server
- cryptoAES = new CryptoAES(transformation, properties,
- inKey, outKey, inIv, outIv);
- // create SaslCipherMeta and send to client,
- // for client, the [inKey, outKey], [inIv, outIv] should be reversed
- RPCProtos.CryptoCipherMeta.Builder ccmBuilder = RPCProtos.CryptoCipherMeta.newBuilder();
- ccmBuilder.setTransformation(transformation);
- ccmBuilder.setInIv(getByteString(outIv));
- ccmBuilder.setInKey(getByteString(outKey));
- ccmBuilder.setOutIv(getByteString(inIv));
- ccmBuilder.setOutKey(getByteString(inKey));
- chrBuilder.setCryptoCipherMeta(ccmBuilder);
- useCryptoAesWrap = true;
- } catch (GeneralSecurityException | IOException ex) {
- throw new UnsupportedCryptoException(ex.getMessage(), ex);
- }
- }
-
- private ByteString getByteString(byte[] bytes) {
- // return singleton to reduce object allocation
- return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
- }
-
- protected UserGroupInformation createUser(ConnectionHeader head) {
- UserGroupInformation ugi = null;
-
- if (!head.hasUserInfo()) {
- return null;
- }
- UserInformation userInfoProto = head.getUserInfo();
- String effectiveUser = null;
- if (userInfoProto.hasEffectiveUser()) {
- effectiveUser = userInfoProto.getEffectiveUser();
- }
- String realUser = null;
- if (userInfoProto.hasRealUser()) {
- realUser = userInfoProto.getRealUser();
- }
- if (effectiveUser != null) {
- if (realUser != null) {
- UserGroupInformation realUserUgi =
- UserGroupInformation.createRemoteUser(realUser);
- ugi = UserGroupInformation.createProxyUser(effectiveUser, realUserUgi);
- } else {
- ugi = UserGroupInformation.createRemoteUser(effectiveUser);
- }
- }
- return ugi;
- }
-
- protected void disposeSasl() {
- if (saslServer != null) {
- try {
- saslServer.dispose();
- saslServer = null;
- } catch (SaslException ignored) {
- // Ignored. This is being disposed of anyway.
- }
- }
- }
-
- /**
- * No protobuf encoding of raw sasl messages
- */
- protected void doRawSaslReply(SaslStatus status, Writable rv,
- String errorClass, String error) throws IOException {
- ByteBufferOutputStream saslResponse = null;
- DataOutputStream out = null;
- try {
- // In my testing, have noticed that sasl messages are usually
- // in the ballpark of 100-200. That's why the initial capacity is 256.
- saslResponse = new ByteBufferOutputStream(256);
- out = new DataOutputStream(saslResponse);
- out.writeInt(status.state); // write status
- if (status == SaslStatus.SUCCESS) {
- rv.write(out);
- } else {
- WritableUtils.writeString(out, errorClass);
- WritableUtils.writeString(out, error);
- }
- saslCall.setSaslTokenResponse(saslResponse.getByteBuffer());
- saslCall.sendResponseIfReady();
- } finally {
- if (saslResponse != null) {
- saslResponse.close();
- }
- if (out != null) {
- out.close();
- }
- }
- }
-
- public void saslReadAndProcess(ByteBuff saslToken) throws IOException,
- InterruptedException {
- if (saslContextEstablished) {
- if (LOG.isTraceEnabled())
- LOG.trace("Have read input token of size " + saslToken.limit()
- + " for processing by saslServer.unwrap()");
-
- if (!useWrap) {
- processOneRpc(saslToken);
- } else {
- byte[] b = saslToken.hasArray() ? saslToken.array() : saslToken.toBytes();
- byte [] plaintextData;
- if (useCryptoAesWrap) {
- // unwrap with CryptoAES
- plaintextData = cryptoAES.unwrap(b, 0, b.length);
- } else {
- plaintextData = saslServer.unwrap(b, 0, b.length);
- }
- processUnwrappedData(plaintextData);
- }
- } else {
- byte[] replyToken;
- try {
- if (saslServer == null) {
- switch (authMethod) {
- case DIGEST:
- if (secretManager == null) {
- throw new AccessDeniedException(
- "Server is not configured to do DIGEST authentication.");
- }
- saslServer = Sasl.createSaslServer(AuthMethod.DIGEST
- .getMechanismName(), null, SaslUtil.SASL_DEFAULT_REALM,
- HBaseSaslRpcServer.getSaslProps(), new SaslDigestCallbackHandler(
- secretManager, this));
- break;
- default:
- UserGroupInformation current = UserGroupInformation.getCurrentUser();
- String fullName = current.getUserName();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Kerberos principal name is " + fullName);
- }
- final String names[] = SaslUtil.splitKerberosName(fullName);
- if (names.length != 3) {
- throw new AccessDeniedException(
- "Kerberos principal name does NOT have the expected "
- + "hostname part: " + fullName);
- }
- current.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws SaslException {
- saslServer = Sasl.createSaslServer(AuthMethod.KERBEROS
- .getMechanismName(), names[0], names[1],
- HBaseSaslRpcServer.getSaslProps(), new SaslGssCallbackHandler());
- return null;
- }
- });
- }
- if (saslServer == null)
- throw new AccessDeniedException(
- "Unable to find SASL server implementation for "
- + authMethod.getMechanismName());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created SASL server with mechanism = " + authMethod.getMechanismName());
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Have read input token of size " + saslToken.limit()
- + " for processing by saslServer.evaluateResponse()");
- }
- replyToken = saslServer
- .evaluateResponse(saslToken.hasArray() ? saslToken.array() : saslToken.toBytes());
- } catch (IOException e) {
- IOException sendToClient = e;
- Throwable cause = e;
- while (cause != null) {
- if (cause instanceof InvalidToken) {
- sendToClient = (InvalidToken) cause;
- break;
- }
- cause = cause.getCause();
- }
- doRawSaslReply(SaslStatus.ERROR, null, sendToClient.getClass().getName(),
- sendToClient.getLocalizedMessage());
- metrics.authenticationFailure();
- String clientIP = this.toString();
- // attempting user could be null
- AUDITLOG.warn(AUTH_FAILED_FOR + clientIP + ":" + attemptingUser);
- throw e;
- }
- if (replyToken != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Will send token of size " + replyToken.length
- + " from saslServer.");
- }
- doRawSaslReply(SaslStatus.SUCCESS, new BytesWritable(replyToken), null,
- null);
- }
- if (saslServer.isComplete()) {
- String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
- useWrap = qop != null && !"auth".equalsIgnoreCase(qop);
- ugi = getAuthorizedUgi(saslServer.getAuthorizationID());
- if (LOG.isDebugEnabled()) {
- LOG.debug("SASL server context established. Authenticated client: "
- + ugi + ". Negotiated QoP is "
- + saslServer.getNegotiatedProperty(Sasl.QOP));
- }
- metrics.authenticationSuccess();
- AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
- saslContextEstablished = true;
- }
- }
- }
-
- private void processUnwrappedData(byte[] inBuf) throws IOException,
- InterruptedException {
- ReadableByteChannel ch = Channels.newChannel(new ByteArrayInputStream(inBuf));
- // Read all RPCs contained in the inBuf, even partial ones
- while (true) {
- int count;
- if (unwrappedDataLengthBuffer.remaining() > 0) {
- count = channelRead(ch, unwrappedDataLengthBuffer);
- if (count <= 0 || unwrappedDataLengthBuffer.remaining() > 0)
- return;
- }
-
- if (unwrappedData == null) {
- unwrappedDataLengthBuffer.flip();
- int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
-
- if (unwrappedDataLength == RpcClient.PING_CALL_ID) {
- if (LOG.isDebugEnabled())
- LOG.debug("Received ping message");
- unwrappedDataLengthBuffer.clear();
- continue; // ping message
- }
- unwrappedData = ByteBuffer.allocate(unwrappedDataLength);
- }
-
- count = channelRead(ch, unwrappedData);
- if (count <= 0 || unwrappedData.remaining() > 0)
- return;
-
- if (unwrappedData.remaining() == 0) {
- unwrappedDataLengthBuffer.clear();
- unwrappedData.flip();
- processOneRpc(new SingleByteBuff(unwrappedData));
- unwrappedData = null;
- }
- }
- }
-
- public void processOneRpc(ByteBuff buf) throws IOException,
- InterruptedException {
- if (connectionHeaderRead) {
- processRequest(buf);
- } else {
- processConnectionHeader(buf);
- this.connectionHeaderRead = true;
- if (!authorizeConnection()) {
- // Throw FatalConnectionException wrapping ACE so client does right thing and closes
- // down the connection instead of trying to read non-existent retun.
- throw new AccessDeniedException("Connection from " + this + " for service " +
- connectionHeader.getServiceName() + " is unauthorized for user: " + ugi);
- }
- this.user = userProvider.create(this.ugi);
- }
- }
-
- protected boolean authorizeConnection() throws IOException {
- try {
- // If auth method is DIGEST, the token was obtained by the
- // real user for the effective user, therefore not required to
- // authorize real user. doAs is allowed only for simple or kerberos
- // authentication
- if (ugi != null && ugi.getRealUser() != null
- && (authMethod != AuthMethod.DIGEST)) {
- ProxyUsers.authorize(ugi, this.getHostAddress(), conf);
- }
- authorize(ugi, connectionHeader, getHostInetAddress());
- metrics.authorizationSuccess();
- } catch (AuthorizationException ae) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connection authorization failed: " + ae.getMessage(), ae);
- }
- metrics.authorizationFailure();
- setupResponse(authFailedResponse, authFailedCall,
- new AccessDeniedException(ae), ae.getMessage());
- authFailedCall.sendResponseIfReady();
- return false;
- }
- return true;
- }
-
- // Reads the connection header following version
- protected void processConnectionHeader(ByteBuff buf) throws IOException {
- if (buf.hasArray()) {
- this.connectionHeader = ConnectionHeader.parseFrom(buf.array());
- } else {
- CodedInputStream cis = UnsafeByteOperations
- .unsafeWrap(new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit()).newCodedInput();
- cis.enableAliasing(true);
- this.connectionHeader = ConnectionHeader.parseFrom(cis);
- }
- String serviceName = connectionHeader.getServiceName();
- if (serviceName == null) throw new EmptyServiceNameException();
- this.service = getService(services, serviceName);
- if (this.service == null) throw new UnknownServiceException(serviceName);
- setupCellBlockCodecs(this.connectionHeader);
- RPCProtos.ConnectionHeaderResponse.Builder chrBuilder =
- RPCProtos.ConnectionHeaderResponse.newBuilder();
- setupCryptoCipher(this.connectionHeader, chrBuilder);
- responseConnectionHeader(chrBuilder);
- UserGroupInformation protocolUser = createUser(connectionHeader);
- if (!useSasl) {
- ugi = protocolUser;
- if (ugi != null) {
- ugi.setAuthenticationMethod(AuthMethod.SIMPLE.authenticationMethod);
- }
- // audit logging for SASL authenticated users happens in saslReadAndProcess()
- if (authenticatedWithFallback) {
- LOG.warn("Allowed fallback to SIMPLE auth for " + ugi
- + " connecting from " + getHostAddress());
- }
- AUDITLOG.info(AUTH_SUCCESSFUL_FOR + ugi);
- } else {
- // user is authenticated
- ugi.setAuthenticationMethod(authMethod.authenticationMethod);
- //Now we check if this is a proxy user case. If the protocol user is
- //different from the 'user', it is a proxy user scenario. However,
- //this is not allowed if user authenticated with DIGEST.
- if ((protocolUser != null)
- && (!protocolUser.getUserName().equals(ugi.getUserName()))) {
- if (authMethod == AuthMethod.DIGEST) {
- // Not allowed to doAs if token authentication is used
- throw new AccessDeniedException("Authenticated user (" + ugi
- + ") doesn't match what the client claims to be ("
- + protocolUser + ")");
- } else {
- // Effective user can be different from authenticated user
- // for simple auth or kerberos auth
- // The user is the real user. Now we create a proxy user
- UserGroupInformation realUser = ugi;
- ugi = UserGroupInformation.createProxyUser(protocolUser
- .getUserName(), realUser);
- // Now the user is a proxy user, set Authentication method Proxy.
- ugi.setAuthenticationMethod(AuthenticationMethod.PROXY);
- }
- }
- }
- if (connectionHeader.hasVersionInfo()) {
- // see if this connection will support RetryImmediatelyException
- retryImmediatelySupported = VersionInfoUtil.hasMinimumVersion(getVersionInfo(), 1, 2);
-
- AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
- + " with version info: "
- + TextFormat.shortDebugString(connectionHeader.getVersionInfo()));
- } else {
- AUDITLOG.info("Connection from " + this.hostAddress + " port: " + this.remotePort
- + " with unknown version info");
- }
- }
-
- private void responseConnectionHeader(RPCProtos.ConnectionHeaderResponse.Builder chrBuilder)
- throws FatalConnectionException {
- // Response the connection header if Crypto AES is enabled
- if (!chrBuilder.hasCryptoCipherMeta()) return;
- try {
- byte[] connectionHeaderResBytes = chrBuilder.build().toByteArray();
- // encrypt the Crypto AES cipher meta data with sasl server, and send to client
- byte[] unwrapped = new byte[connectionHeaderResBytes.length + 4];
- Bytes.putBytes(unwrapped, 0, Bytes.toBytes(connectionHeaderResBytes.length), 0, 4);
- Bytes.putBytes(unwrapped, 4, connectionHeaderResBytes, 0, connectionHeaderResBytes.length);
-
- doConnectionHeaderResponse(saslServer.wrap(unwrapped, 0, unwrapped.length));
- } catch (IOException ex) {
- throw new UnsupportedCryptoException(ex.getMessage(), ex);
- }
- }
-
- /**
- * Send the response for connection header
- */
- private void doConnectionHeaderResponse(byte[] wrappedCipherMetaData)
- throws IOException {
- ByteBufferOutputStream response = null;
- DataOutputStream out = null;
- try {
- response = new ByteBufferOutputStream(wrappedCipherMetaData.length + 4);
- out = new DataOutputStream(response);
- out.writeInt(wrappedCipherMetaData.length);
- out.write(wrappedCipherMetaData);
-
- setConnectionHeaderResponseCall.setConnectionHeaderResponse(response
- .getByteBuffer());
- setConnectionHeaderResponseCall.sendResponseIfReady();
- } finally {
- if (out != null) {
- out.close();
- }
- if (response != null) {
- response.close();
- }
- }
- }
-
- /**
- * @param buf
- * Has the request header and the request param and optionally
- * encoded data buffer all in this one array.
- * @throws IOException
- * @throws InterruptedException
- */
- protected void processRequest(ByteBuff buf) throws IOException,
- InterruptedException {
- long totalRequestSize = buf.limit();
- int offset = 0;
- // Here we read in the header. We avoid having pb
- // do its default 4k allocation for CodedInputStream. We force it to use
- // backing array.
- CodedInputStream cis;
- if (buf.hasArray()) {
- cis = UnsafeByteOperations.unsafeWrap(buf.array(), 0, buf.limit())
- .newCodedInput();
- } else {
- cis = UnsafeByteOperations.unsafeWrap(
- new ByteBuffByteInput(buf, 0, buf.limit()), 0, buf.limit())
- .newCodedInput();
- }
- cis.enableAliasing(true);
- int headerSize = cis.readRawVarint32();
- offset = cis.getTotalBytesRead();
- Message.Builder builder = RequestHeader.newBuilder();
- ProtobufUtil.mergeFrom(builder, cis, headerSize);
- RequestHeader header = (RequestHeader) builder.build();
- offset += headerSize;
- int id = header.getCallId();
- if (LOG.isTraceEnabled()) {
- LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
- + " totalRequestSize: " + totalRequestSize + " bytes");
- }
- // Enforcing the call queue size, this triggers a retry in the client
- // This is a bit late to be doing this check - we have already read in the
- // total request.
- if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
- final ServerCall callTooBig = createCall(id, this.service, null,
- null, null, null, this, totalRequestSize, null, null, 0,
- this.callCleanup);
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
- setupResponse(responseBuffer, callTooBig, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + server.getServerName()
- + ", is hbase.ipc.server.max.callqueue.size too small?");
- callTooBig.sendResponseIfReady();
- return;
- }
- MethodDescriptor md = null;
- Message param = null;
- CellScanner cellScanner = null;
- try {
- if (header.hasRequestParam() && header.getRequestParam()) {
- md = this.service.getDescriptorForType().findMethodByName(
- header.getMethodName());
- if (md == null)
- throw new UnsupportedOperationException(header.getMethodName());
- builder = this.service.getRequestPrototype(md).newBuilderForType();
- cis.resetSizeCounter();
- int paramSize = cis.readRawVarint32();
- offset += cis.getTotalBytesRead();
- if (builder != null) {
- ProtobufUtil.mergeFrom(builder, cis, paramSize);
- param = builder.build();
- }
- offset += paramSize;
- } else {
- // currently header must have request param, so we directly throw
- // exception here
- String msg = "Invalid request header: "
- + TextFormat.shortDebugString(header)
- + ", should have param set in it";
- LOG.warn(msg);
- throw new DoNotRetryIOException(msg);
- }
- if (header.hasCellBlockMeta()) {
- buf.position(offset);
- ByteBuff dup = buf.duplicate();
- dup.limit(offset + header.getCellBlockMeta().getLength());
- cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(
- this.codec, this.compressionCodec, dup);
- }
- } catch (Throwable t) {
- InetSocketAddress address = getListenerAddress();
- String msg = (address != null ? address : "(channel closed)")
- + " is unable to read call parameter from client "
- + getHostAddress();
- LOG.warn(msg, t);
-
- metrics.exception(t);
-
- // probably the hbase hadoop version does not match the running hadoop
- // version
- if (t instanceof LinkageError) {
- t = new DoNotRetryIOException(t);
- }
- // If the method is not present on the server, do not retry.
- if (t instanceof UnsupportedOperationException) {
- t = new DoNotRetryIOException(t);
- }
-
- final ServerCall readParamsFailedCall = createCall(id,
- this.service, null, null, null, null, this, totalRequestSize, null,
- null, 0, this.callCleanup);
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- setupResponse(responseBuffer, readParamsFailedCall, t,
- msg + "; " + t.getMessage());
- readParamsFailedCall.sendResponseIfReady();
- return;
- }
-
- TraceInfo traceInfo = header.hasTraceInfo() ? new TraceInfo(header
- .getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
- : null;
- int timeout = 0;
- if (header.hasTimeout() && header.getTimeout() > 0) {
- timeout = Math.max(minClientRequestTimeout, header.getTimeout());
- }
- ServerCall call = createCall(id, this.service, md, header, param,
- cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout,
- this.callCleanup);
-
- if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
- callQueueSizeInBytes.add(-1 * call.getSize());
-
- ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
- metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
- setupResponse(responseBuffer, call, CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + server.getServerName()
- + ", too many items queued ?");
- call.sendResponseIfReady();
- }
- }
-
- public abstract boolean isConnectionOpen();
-
- public abstract ServerCall createCall(int id, final BlockingService service,
- final MethodDescriptor md, RequestHeader header, Message param,
- CellScanner cellScanner, Connection connection, long size,
- TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
- CallCleanup reqCleanup);
- }
-
/**
* Datastructure for passing a {@link BlockingService} and its associated class of
* protobuf service interface. For example, a server that fielded what is defined
@@ -1122,7 +348,7 @@ public abstract class RpcServer implements RpcServerInterface,
* @param error error message, if the call failed
* @throws IOException
*/
- protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t,
+ protected void setupResponse(ByteArrayOutputStream response, ServerCall<?> call, Throwable t,
String error) throws IOException {
if (response != null) response.reset();
call.setResponse(null, null, t, error);
@@ -1574,44 +800,4 @@ public abstract class RpcServer implements RpcServerInterface,
public void setRsRpcServices(RSRpcServices rsRpcServices) {
this.rsRpcServices = rsRpcServices;
}
-
- protected static class ByteBuffByteInput extends ByteInput {
-
- private ByteBuff buf;
- private int offset;
- private int length;
-
- ByteBuffByteInput(ByteBuff buf, int offset, int length) {
- this.buf = buf;
- this.offset = offset;
- this.length = length;
- }
-
- @Override
- public byte read(int offset) {
- return this.buf.get(getAbsoluteOffset(offset));
- }
-
- private int getAbsoluteOffset(int offset) {
- return this.offset + offset;
- }
-
- @Override
- public int read(int offset, byte[] out, int outOffset, int len) {
- this.buf.get(getAbsoluteOffset(offset), out, outOffset, len);
- return len;
- }
-
- @Override
- public int read(int offset, ByteBuffer out) {
- int len = out.remaining();
- this.buf.get(out, getAbsoluteOffset(offset), len);
- return len;
- }
-
- @Override
- public int size() {
- return this.length;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/341223d8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index 9294839..15fe3e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
-import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
@@ -52,7 +51,7 @@ import org.apache.htrace.TraceInfo;
* the result.
*/
@InterfaceAudience.Private
-abstract class ServerCall implements RpcCall {
+abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall {
protected final int id; // the client's call id
protected final BlockingService service;
@@ -61,7 +60,7 @@ abstract class ServerCall implements RpcCall {
protected Message param; // the parameter passed
// Optional cell data passed outside of protobufs.
protected final CellScanner cellScanner;
- protected final Connection connection; // connection to client
+ protected final T connection; // connection to client
protected final long receiveTime; // the time received when response is null
// the time served when response is not null
protected final int timeout;
@@ -96,7 +95,7 @@ abstract class ServerCall implements RpcCall {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
- Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo,
+ Message param, CellScanner cellScanner, T connection, long size, TraceInfo tinfo,
InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
this.id = id;