You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/05/09 07:06:24 UTC
hbase git commit: HBASE-18009 Move RpcServer.Call to a separated file
Repository: hbase
Updated Branches:
refs/heads/master 959deb0e5 -> 51d4c68b7
HBASE-18009 Move RpcServer.Call to a separated file
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/51d4c68b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/51d4c68b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/51d4c68b
Branch: refs/heads/master
Commit: 51d4c68b7cce43af1190f9195bfb08963375bc27
Parents: 959deb0
Author: zhangduo <zh...@apache.org>
Authored: Mon May 8 20:36:33 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue May 9 14:56:03 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 4 +-
.../apache/hadoop/hbase/ipc/NettyRpcServer.java | 91 +---
.../hadoop/hbase/ipc/NettyServerCall.java | 67 +++
.../org/apache/hadoop/hbase/ipc/RpcCall.java | 5 -
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 498 +-----------------
.../org/apache/hadoop/hbase/ipc/ServerCall.java | 527 +++++++++++++++++++
.../hadoop/hbase/ipc/SimpleRpcServer.java | 122 ++---
.../hadoop/hbase/ipc/SimpleServerCall.java | 79 +++
.../apache/hadoop/hbase/ipc/TestCallRunner.java | 3 +-
.../hbase/ipc/TestSimpleRpcScheduler.java | 60 +--
.../hadoop/hbase/security/TestSecureIPC.java | 8 +-
11 files changed, 774 insertions(+), 690 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 0aabc10..f16fc50 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -75,8 +75,8 @@ public class CallRunner {
* @deprecated As of release 2.0, this will be removed in HBase 3.0
*/
@Deprecated
- public RpcServer.Call getCall() {
- return (RpcServer.Call) call;
+ public ServerCall getCall() {
+ return (ServerCall) call;
}
public void setStatus(MonitoredRPCHandler status) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
index be55378..c18b894 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java
@@ -59,7 +59,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
@@ -203,13 +202,14 @@ public class NettyRpcServer extends RpcServer {
this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
}
this.remotePort = inetSocketAddress.getPort();
- this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
- 0, null, null, 0, null);
- this.setConnectionHeaderResponseCall = new Call(
- CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
- this, 0, null, null, 0, null);
- this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
- null, null, null, this, 0, null, null, 0, null);
+ this.saslCall = new NettyServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
+ null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
+ this.setConnectionHeaderResponseCall =
+ new NettyServerCall(CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null, this,
+ 0, null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
+ this.authFailedCall =
+ new NettyServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null, null, null, this, 0,
+ null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
}
void readPreamble(ByteBuf buffer) throws IOException {
@@ -243,7 +243,7 @@ public class NettyRpcServer extends RpcServer {
AccessDeniedException ae = new AccessDeniedException(
"Authentication is required");
setupResponse(authFailedResponse, authFailedCall, ae, ae.getMessage());
- ((Call) authFailedCall)
+ ((NettyServerCall) authFailedCall)
.sendResponseIfReady(ChannelFutureListener.CLOSE);
return;
}
@@ -269,8 +269,8 @@ public class NettyRpcServer extends RpcServer {
private void doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
- Call fakeCall = new Call(-1, null, null, null, null, null, this, -1,
- null, null, 0, null);
+ NettyServerCall fakeCall = new NettyServerCall(-1, null, null, null, null, null, this, -1,
+ null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null);
setupResponse(null, fakeCall, e, msg);
// closes out the connection.
fakeCall.sendResponseIfReady(ChannelFutureListener.CLOSE);
@@ -336,59 +336,17 @@ public class NettyRpcServer extends RpcServer {
}
@Override
- public RpcServer.Call createCall(int id, final BlockingService service,
+ public ServerCall createCall(int id, final BlockingService service,
final MethodDescriptor md, RequestHeader header, Message param,
CellScanner cellScanner, RpcServer.Connection connection, long size,
TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
CallCleanup reqCleanup) {
- return new Call(id, service, md, header, param, cellScanner, connection,
- size, tinfo, remoteAddress, timeout, reqCleanup);
+ return new NettyServerCall(id, service, md, header, param, cellScanner, connection, size,
+ tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
+ reqCleanup);
}
}
- /**
- * Datastructure that holds all necessary to a method invocation and then afterward, carries the
- * result.
- */
- @InterfaceStability.Evolving
- public class Call extends RpcServer.Call {
-
- Call(int id, final BlockingService service, final MethodDescriptor md,
- RequestHeader header, Message param, CellScanner cellScanner,
- RpcServer.Connection connection, long size, TraceInfo tinfo,
- final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
- super(id, service, md, header, param, cellScanner,
- connection, size, tinfo, remoteAddress, timeout, reqCleanup);
- }
-
- @Override
- public long disconnectSince() {
- if (!getConnection().isConnectionOpen()) {
- return System.currentTimeMillis() - timestamp;
- } else {
- return -1L;
- }
- }
-
- NettyConnection getConnection() {
- return (NettyConnection) this.connection;
- }
-
- /**
- * If we have a response, and delay is not set, then respond immediately. Otherwise, do not
- * respond to client. This is called by the RPC code in the context of the Handler thread.
- */
- @Override
- public synchronized void sendResponseIfReady() throws IOException {
- getConnection().channel.writeAndFlush(this);
- }
-
- public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
- getConnection().channel.writeAndFlush(this).addListener(listener);
- }
-
- }
-
private class Initializer extends ChannelInitializer<SocketChannel> {
final int maxRequestSize;
@@ -483,7 +441,7 @@ public class NettyRpcServer extends RpcServer {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- final Call call = (Call) msg;
+ final NettyServerCall call = (NettyServerCall) msg;
ByteBuf response = Unpooled.wrappedBuffer(call.response.getBuffers());
ctx.write(response, promise).addListener(new CallWriteListener(call));
}
@@ -492,9 +450,9 @@ public class NettyRpcServer extends RpcServer {
private class CallWriteListener implements ChannelFutureListener {
- private Call call;
+ private NettyServerCall call;
- CallWriteListener(Call call) {
+ CallWriteListener(NettyServerCall call) {
this.call = call;
}
@@ -527,14 +485,11 @@ public class NettyRpcServer extends RpcServer {
}
@Override
- public Pair<Message, CellScanner> call(BlockingService service,
- MethodDescriptor md, Message param, CellScanner cellScanner,
- long receiveTime, MonitoredRPCHandler status, long startTime, int timeout)
- throws IOException {
- Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null,
- -1, null, null, timeout, null);
- fakeCall.setReceiveTime(receiveTime);
+ public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
+ Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
+ long startTime, int timeout) throws IOException {
+ NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null,
+ -1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null);
return call(fakeCall, status);
}
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
new file mode 100644
index 0000000..a3f23dd
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import io.netty.channel.ChannelFutureListener;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.ipc.NettyRpcServer.NettyConnection;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * Datastructure that holds all necessary to a method invocation and then afterward, carries the
+ * result.
+ */
+@InterfaceAudience.Private
+class NettyServerCall extends ServerCall {
+
+ NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
+ Message param, CellScanner cellScanner, RpcServer.Connection connection, long size,
+ TraceInfo tinfo, InetAddress remoteAddress, long receiveTime, int timeout,
+ ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
+ super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
+ receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+ }
+
+ NettyConnection getConnection() {
+ return (NettyConnection) this.connection;
+ }
+
+ /**
+ * If we have a response, and delay is not set, then respond immediately. Otherwise, do not
+ * respond to client. This is called by the RPC code in the context of the Handler thread.
+ */
+ @Override
+ public synchronized void sendResponseIfReady() throws IOException {
+ getConnection().channel.writeAndFlush(this);
+ }
+
+ public synchronized void sendResponseIfReady(ChannelFutureListener listener) throws IOException {
+ getConnection().channel.writeAndFlush(this).addListener(listener);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
index 239ea9e..fc86594 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
@@ -65,11 +65,6 @@ public interface RpcCall extends RpcCallContext {
long getReceiveTime();
/**
- * Set the timestamp when the call is constructed.
- */
- void setReceiveTime(long receiveTime);
-
- /**
* @return The time when the call starts to be executed.
*/
long getStartTime();
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ebae1fb..bbc329c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -62,9 +62,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
-import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
-import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
@@ -88,7 +86,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteInput;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@@ -98,13 +95,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.BytesWritable;
@@ -120,7 +113,6 @@ import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.TraceInfo;
import org.codehaus.jackson.map.ObjectMapper;
@@ -265,475 +257,6 @@ public abstract class RpcServer implements RpcServerInterface,
*/
private RSRpcServices rsRpcServices;
- /**
- * Datastructure that holds all necessary to a method invocation and then afterward, carries
- * the result.
- */
- @InterfaceStability.Evolving
- @InterfaceAudience.Private
- public abstract class Call implements RpcCall {
- protected int id; // the client's call id
- protected BlockingService service;
- protected MethodDescriptor md;
- protected RequestHeader header;
- protected Message param; // the parameter passed
- // Optional cell data passed outside of protobufs.
- protected CellScanner cellScanner;
- protected Connection connection; // connection to client
- protected long timestamp; // the time received when response is null
- // the time served when response is not null
- protected int timeout;
- protected long startTime;
- protected long deadline;// the deadline to handle this call, if exceed we can drop it.
-
- /**
- * Chain of buffers to send as response.
- */
- protected BufferChain response;
-
- protected long size; // size of current call
- protected boolean isError;
- protected TraceInfo tinfo;
- protected ByteBufferListOutputStream cellBlockStream = null;
- protected CallCleanup reqCleanup = null;
-
- protected User user;
- protected InetAddress remoteAddress;
- protected RpcCallback rpcCallback;
-
- private long responseCellSize = 0;
- private long responseBlockSize = 0;
- // cumulative size of serialized exceptions
- private long exceptionSize = 0;
- private boolean retryImmediatelySupported;
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
- justification="Can't figure why this complaint is happening... see below")
- Call(int id, final BlockingService service, final MethodDescriptor md,
- RequestHeader header, Message param, CellScanner cellScanner,
- Connection connection, long size, TraceInfo tinfo,
- final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
- this.id = id;
- this.service = service;
- this.md = md;
- this.header = header;
- this.param = param;
- this.cellScanner = cellScanner;
- this.connection = connection;
- this.timestamp = System.currentTimeMillis();
- this.response = null;
- this.isError = false;
- this.size = size;
- this.tinfo = tinfo;
- this.user = connection == null? null: connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
- this.remoteAddress = remoteAddress;
- this.retryImmediatelySupported =
- connection == null? null: connection.retryImmediatelySupported;
- this.timeout = timeout;
- this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
- this.reqCleanup = reqCleanup;
- }
-
- /**
- * Call is done. Execution happened and we returned results to client. It is
- * now safe to cleanup.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
- justification = "Presume the lock on processing request held by caller is protection enough")
- void done() {
- if (this.cellBlockStream != null) {
- // This will return back the BBs which we got from pool.
- this.cellBlockStream.releaseResources();
- this.cellBlockStream = null;
- }
- // If the call was run successfuly, we might have already returned the BB
- // back to pool. No worries..Then inputCellBlock will be null
- cleanup();
- }
-
- @Override
- public void cleanup() {
- if (this.reqCleanup != null) {
- this.reqCleanup.run();
- this.reqCleanup = null;
- }
- }
-
- @Override
- public String toString() {
- return toShortString() + " param: " +
- (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
- " connection: " + connection.toString();
- }
-
- @Override
- public RequestHeader getHeader() {
- return this.header;
- }
-
- @Override
- public int getPriority() {
- return this.header.getPriority();
- }
-
- /*
- * Short string representation without param info because param itself could be huge depends on
- * the payload of a command
- */
- @Override
- public String toShortString() {
- String serviceName = this.connection.service != null ?
- this.connection.service.getDescriptorForType().getName() : "null";
- return "callId: " + this.id + " service: " + serviceName +
- " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
- " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
- " connection: " + connection.toString() +
- " deadline: " + deadline;
- }
-
- protected synchronized void setSaslTokenResponse(ByteBuffer response) {
- ByteBuffer[] responseBufs = new ByteBuffer[1];
- responseBufs[0] = response;
- this.response = new BufferChain(responseBufs);
- }
-
- protected synchronized void setConnectionHeaderResponse(ByteBuffer response) {
- ByteBuffer[] responseBufs = new ByteBuffer[1];
- responseBufs[0] = response;
- this.response = new BufferChain(responseBufs);
- }
-
- @Override
- public synchronized void setResponse(Message m, final CellScanner cells,
- Throwable t, String errorMsg) {
- if (this.isError) return;
- if (t != null) this.isError = true;
- BufferChain bc = null;
- try {
- ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
- // Call id.
- headerBuilder.setCallId(this.id);
- if (t != null) {
- setExceptionResponse(t, errorMsg, headerBuilder);
- }
- // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
- // reservoir when finished. This is hacky and the hack is not contained but benefits are
- // high when we can avoid a big buffer allocation on each rpc.
- List<ByteBuffer> cellBlock = null;
- int cellBlockSize = 0;
- if (reservoir != null) {
- this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec,
- this.connection.compressionCodec, cells, reservoir);
- if (this.cellBlockStream != null) {
- cellBlock = this.cellBlockStream.getByteBuffers();
- cellBlockSize = this.cellBlockStream.size();
- }
- } else {
- ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec,
- this.connection.compressionCodec, cells);
- if (b != null) {
- cellBlockSize = b.remaining();
- cellBlock = new ArrayList<>(1);
- cellBlock.add(b);
- }
- }
-
- if (cellBlockSize > 0) {
- CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
- // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
- cellBlockBuilder.setLength(cellBlockSize);
- headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
- }
- Message header = headerBuilder.build();
- ByteBuffer headerBuf =
- createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
- ByteBuffer[] responseBufs = null;
- int cellBlockBufferSize = 0;
- if (cellBlock != null) {
- cellBlockBufferSize = cellBlock.size();
- responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
- } else {
- responseBufs = new ByteBuffer[1];
- }
- responseBufs[0] = headerBuf;
- if (cellBlock != null) {
- for (int i = 0; i < cellBlockBufferSize; i++) {
- responseBufs[i + 1] = cellBlock.get(i);
- }
- }
- bc = new BufferChain(responseBufs);
- if (connection.useWrap) {
- bc = wrapWithSasl(bc);
- }
- } catch (IOException e) {
- LOG.warn("Exception while creating response " + e);
- }
- this.response = bc;
- // Once a response message is created and set to this.response, this Call can be treated as
- // done. The Responder thread will do the n/w write of this message back to client.
- if (this.rpcCallback != null) {
- try {
- this.rpcCallback.run();
- } catch (Exception e) {
- // Don't allow any exception here to kill this handler thread.
- LOG.warn("Exception while running the Rpc Callback.", e);
- }
- }
- }
-
- protected void setExceptionResponse(Throwable t, String errorMsg,
- ResponseHeader.Builder headerBuilder) {
- ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
- exceptionBuilder.setExceptionClassName(t.getClass().getName());
- exceptionBuilder.setStackTrace(errorMsg);
- exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
- if (t instanceof RegionMovedException) {
- // Special casing for this exception. This is only one carrying a payload.
- // Do this instead of build a generic system for allowing exceptions carry
- // any kind of payload.
- RegionMovedException rme = (RegionMovedException)t;
- exceptionBuilder.setHostname(rme.getHostname());
- exceptionBuilder.setPort(rme.getPort());
- }
- // Set the exception as the result of the method invocation.
- headerBuilder.setException(exceptionBuilder.build());
- }
-
- protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
- int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
- // Organize the response as a set of bytebuffers rather than collect it all together inside
- // one big byte array; save on allocations.
- // for writing the header, we check if there is available space in the buffers
- // created for the cellblock itself. If there is space for the header, we reuse
- // the last buffer in the cellblock. This applies to the cellblock created from the
- // pool or even the onheap cellblock buffer in case there is no pool enabled.
- // Possible reuse would avoid creating a temporary array for storing the header every time.
- ByteBuffer possiblePBBuf =
- (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
- int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
- resultVintSize = 0;
- if (header != null) {
- headerSerializedSize = header.getSerializedSize();
- headerVintSize = CodedOutputStream.computeRawVarint32Size(headerSerializedSize);
- }
- if (result != null) {
- resultSerializedSize = result.getSerializedSize();
- resultVintSize = CodedOutputStream.computeRawVarint32Size(resultSerializedSize);
- }
- // calculate the total size
- int totalSize = headerSerializedSize + headerVintSize
- + (resultSerializedSize + resultVintSize)
- + cellBlockSize;
- int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
- + resultVintSize + Bytes.SIZEOF_INT;
- // Only if the last buffer has enough space for header use it. Else allocate
- // a new buffer. Assume they are all flipped
- if (possiblePBBuf != null
- && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
- // duplicate the buffer. This is where the header is going to be written
- ByteBuffer pbBuf = possiblePBBuf.duplicate();
- // get the current limit
- int limit = pbBuf.limit();
- // Position such that we write the header to the end of the buffer
- pbBuf.position(limit);
- // limit to the header size
- pbBuf.limit(totalPBSize + limit);
- // mark the current position
- pbBuf.mark();
- writeToCOS(result, header, totalSize, pbBuf);
- // reset the buffer back to old position
- pbBuf.reset();
- return pbBuf;
- } else {
- return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
- }
- }
-
- private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
- throws IOException {
- ByteBufferUtils.putInt(pbBuf, totalSize);
- // create COS that works on BB
- CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
- if (header != null) {
- cos.writeMessageNoTag(header);
- }
- if (result != null) {
- cos.writeMessageNoTag(result);
- }
- cos.flush();
- cos.checkNoSpaceLeft();
- }
-
- private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
- int totalSize, int totalPBSize) throws IOException {
- ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
- writeToCOS(result, header, totalSize, pbBuf);
- pbBuf.flip();
- return pbBuf;
- }
-
- protected BufferChain wrapWithSasl(BufferChain bc)
- throws IOException {
- if (!this.connection.useSasl) return bc;
- // Looks like no way around this; saslserver wants a byte array. I have to make it one.
- // THIS IS A BIG UGLY COPY.
- byte [] responseBytes = bc.getBytes();
- byte [] token;
- // synchronization may be needed since there can be multiple Handler
- // threads using saslServer or Crypto AES to wrap responses.
- if (connection.useCryptoAesWrap) {
- // wrap with Crypto AES
- synchronized (connection.cryptoAES) {
- token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
- }
- } else {
- synchronized (connection.saslServer) {
- token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
- }
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Adding saslServer wrapped token of size " + token.length
- + " as call response.");
- }
-
- ByteBuffer[] responseBufs = new ByteBuffer[2];
- responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
- responseBufs[1] = ByteBuffer.wrap(token);
- return new BufferChain(responseBufs);
- }
-
- @Override
- public boolean isClientCellBlockSupported() {
- return this.connection != null && this.connection.codec != null;
- }
-
- @Override
- public long getResponseCellSize() {
- return responseCellSize;
- }
-
- @Override
- public void incrementResponseCellSize(long cellSize) {
- responseCellSize += cellSize;
- }
-
- @Override
- public long getResponseBlockSize() {
- return responseBlockSize;
- }
-
- @Override
- public void incrementResponseBlockSize(long blockSize) {
- responseBlockSize += blockSize;
- }
-
- @Override
- public long getResponseExceptionSize() {
- return exceptionSize;
- }
- @Override
- public void incrementResponseExceptionSize(long exSize) {
- exceptionSize += exSize;
- }
-
- @Override
- public long getSize() {
- return this.size;
- }
-
- @Override
- public long getDeadline() {
- return deadline;
- }
-
- @Override
- public User getRequestUser() {
- return user;
- }
-
- @Override
- public String getRequestUserName() {
- User user = getRequestUser();
- return user == null? null: user.getShortName();
- }
-
- @Override
- public InetAddress getRemoteAddress() {
- return remoteAddress;
- }
-
- @Override
- public VersionInfo getClientVersionInfo() {
- return connection.getVersionInfo();
- }
-
- @Override
- public synchronized void setCallBack(RpcCallback callback) {
- this.rpcCallback = callback;
- }
-
- @Override
- public boolean isRetryImmediatelySupported() {
- return retryImmediatelySupported;
- }
-
- @Override
- public BlockingService getService() {
- return service;
- }
-
- @Override
- public MethodDescriptor getMethod() {
- return md;
- }
-
- @Override
- public Message getParam() {
- return param;
- }
-
- @Override
- public CellScanner getCellScanner() {
- return cellScanner;
- }
-
- @Override
- public long getReceiveTime() {
- return timestamp;
- }
-
- @Override
- public void setReceiveTime(long t) {
- this.timestamp = t;
- }
-
- @Override
- public long getStartTime() {
- return startTime;
- }
-
- @Override
- public void setStartTime(long t) {
- this.startTime = t;
- }
-
- @Override
- public int getTimeout() {
- return timeout;
- }
-
- @Override
- public int getRemotePort() {
- return connection.getRemotePort();
- }
-
- @Override
- public TraceInfo getTraceInfo() {
- return tinfo;
- }
-
- }
-
@FunctionalInterface
protected static interface CallCleanup {
void run();
@@ -781,15 +304,15 @@ public abstract class RpcServer implements RpcServerInterface,
protected boolean useCryptoAesWrap = false;
// Fake 'call' for failed authorization response
protected static final int AUTHORIZATION_FAILED_CALLID = -1;
- protected Call authFailedCall;
+ protected ServerCall authFailedCall;
protected ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream();
// Fake 'call' for SASL context setup
protected static final int SASL_CALLID = -33;
- protected Call saslCall;
+ protected ServerCall saslCall;
// Fake 'call' for connection header response
protected static final int CONNECTION_HEADER_RESPONSE_CALLID = -34;
- protected Call setConnectionHeaderResponseCall;
+ protected ServerCall setConnectionHeaderResponseCall;
// was authentication allowed with a fallback to simple auth
protected boolean authenticatedWithFallback;
@@ -1366,7 +889,7 @@ public abstract class RpcServer implements RpcServerInterface,
// This is a bit late to be doing this check - we have already read in the
// total request.
if ((totalRequestSize + callQueueSizeInBytes.sum()) > maxQueueSizeInBytes) {
- final RpcServer.Call callTooBig = createCall(id, this.service, null,
+ final ServerCall callTooBig = createCall(id, this.service, null,
null, null, null, this, totalRequestSize, null, null, 0,
this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
@@ -1430,7 +953,7 @@ public abstract class RpcServer implements RpcServerInterface,
t = new DoNotRetryIOException(t);
}
- final RpcServer.Call readParamsFailedCall = createCall(id,
+ final ServerCall readParamsFailedCall = createCall(id,
this.service, null, null, null, null, this, totalRequestSize, null,
null, 0, this.callCleanup);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
@@ -1447,7 +970,7 @@ public abstract class RpcServer implements RpcServerInterface,
if (header.hasTimeout() && header.getTimeout() > 0) {
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
}
- RpcServer.Call call = createCall(id, this.service, md, header, param,
+ ServerCall call = createCall(id, this.service, md, header, param,
cellScanner, this, totalRequestSize, traceInfo, this.addr, timeout,
this.callCleanup);
@@ -1465,7 +988,7 @@ public abstract class RpcServer implements RpcServerInterface,
public abstract boolean isConnectionOpen();
- public abstract Call createCall(int id, final BlockingService service,
+ public abstract ServerCall createCall(int id, final BlockingService service,
final MethodDescriptor md, RequestHeader header, Message param,
CellScanner cellScanner, Connection connection, long size,
TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
@@ -1594,14 +1117,13 @@ public abstract class RpcServer implements RpcServerInterface,
/**
* Setup response for the RPC Call.
- *
* @param response buffer to serialize the response into
- * @param call {@link Call} to which we are setting up the response
+ * @param call {@link ServerCall} to which we are setting up the response
* @param error error message, if the call failed
* @throws IOException
*/
- protected void setupResponse(ByteArrayOutputStream response, Call call, Throwable t, String error)
- throws IOException {
+ protected void setupResponse(ByteArrayOutputStream response, ServerCall call, Throwable t,
+ String error) throws IOException {
if (response != null) response.reset();
call.setResponse(null, null, t, error);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
new file mode 100644
index 0000000..9294839
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -0,0 +1,527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.ipc.RpcServer.Connection;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
+import org.apache.hadoop.hbase.util.ByteBufferUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * Datastructure that holds all necessary to a method invocation and then afterward, carries
+ * the result.
+ */
+@InterfaceAudience.Private
+abstract class ServerCall implements RpcCall {
+
+ protected final int id; // the client's call id
+ protected final BlockingService service;
+ protected final MethodDescriptor md;
+ protected final RequestHeader header;
+ protected Message param; // the parameter passed
+ // Optional cell data passed outside of protobufs.
+ protected final CellScanner cellScanner;
+ protected final Connection connection; // connection to client
+ protected final long receiveTime; // the time received when response is null
+ // the time served when response is not null
+ protected final int timeout;
+ protected long startTime;
+ protected final long deadline;// the deadline to handle this call, if exceed we can drop it.
+
+ protected final ByteBufferPool reservoir;
+
+ protected final CellBlockBuilder cellBlockBuilder;
+
+ /**
+ * Chain of buffers to send as response.
+ */
+ protected BufferChain response;
+
+ protected final long size; // size of current call
+ protected boolean isError;
+ protected final TraceInfo tinfo;
+ protected ByteBufferListOutputStream cellBlockStream = null;
+ protected CallCleanup reqCleanup = null;
+
+ protected User user;
+ protected final InetAddress remoteAddress;
+ protected RpcCallback rpcCallback;
+
+ private long responseCellSize = 0;
+ private long responseBlockSize = 0;
+ // cumulative size of serialized exceptions
+ private long exceptionSize = 0;
+ private final boolean retryImmediatelySupported;
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
+ justification="Can't figure why this complaint is happening... see below")
+ ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
+ Message param, CellScanner cellScanner, Connection connection, long size, TraceInfo tinfo,
+ InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir,
+ CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) {
+ this.id = id;
+ this.service = service;
+ this.md = md;
+ this.header = header;
+ this.param = param;
+ this.cellScanner = cellScanner;
+ this.connection = connection;
+ this.receiveTime = receiveTime;
+ this.response = null;
+ this.isError = false;
+ this.size = size;
+ this.tinfo = tinfo;
+ this.user = connection == null ? null : connection.user; // FindBugs: NP_NULL_ON_SOME_PATH
+ this.remoteAddress = remoteAddress;
+ this.retryImmediatelySupported =
+ connection == null ? false : connection.retryImmediatelySupported;
+ this.timeout = timeout;
+ this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE;
+ this.reservoir = reservoir;
+ this.cellBlockBuilder = cellBlockBuilder;
+ this.reqCleanup = reqCleanup;
+ }
+
+ /**
+ * Call is done. Execution happened and we returned results to client. It is
+ * now safe to cleanup.
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "Presume the lock on processing request held by caller is protection enough")
+ void done() {
+ if (this.cellBlockStream != null) {
+ // This will return back the BBs which we got from pool.
+ this.cellBlockStream.releaseResources();
+ this.cellBlockStream = null;
+ }
+ // If the call was run successfuly, we might have already returned the BB
+ // back to pool. No worries..Then inputCellBlock will be null
+ cleanup();
+ }
+
+ @Override
+ public void cleanup() {
+ if (this.reqCleanup != null) {
+ this.reqCleanup.run();
+ this.reqCleanup = null;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toShortString() + " param: " +
+ (this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") +
+ " connection: " + connection.toString();
+ }
+
+ @Override
+ public RequestHeader getHeader() {
+ return this.header;
+ }
+
+ @Override
+ public int getPriority() {
+ return this.header.getPriority();
+ }
+
+ /*
+ * Short string representation without param info because param itself could be huge depends on
+ * the payload of a command
+ */
+ @Override
+ public String toShortString() {
+ String serviceName = this.connection.service != null ?
+ this.connection.service.getDescriptorForType().getName() : "null";
+ return "callId: " + this.id + " service: " + serviceName +
+ " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
+ " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
+ " connection: " + connection.toString() +
+ " deadline: " + deadline;
+ }
+
+ protected synchronized void setSaslTokenResponse(ByteBuffer response) {
+ ByteBuffer[] responseBufs = new ByteBuffer[1];
+ responseBufs[0] = response;
+ this.response = new BufferChain(responseBufs);
+ }
+
+ protected synchronized void setConnectionHeaderResponse(ByteBuffer response) {
+ ByteBuffer[] responseBufs = new ByteBuffer[1];
+ responseBufs[0] = response;
+ this.response = new BufferChain(responseBufs);
+ }
+
+ @Override
+ public synchronized void setResponse(Message m, final CellScanner cells,
+ Throwable t, String errorMsg) {
+ if (this.isError) return;
+ if (t != null) this.isError = true;
+ BufferChain bc = null;
+ try {
+ ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
+ // Call id.
+ headerBuilder.setCallId(this.id);
+ if (t != null) {
+ setExceptionResponse(t, errorMsg, headerBuilder);
+ }
+ // Pass reservoir to buildCellBlock. Keep reference to returne so can add it back to the
+ // reservoir when finished. This is hacky and the hack is not contained but benefits are
+ // high when we can avoid a big buffer allocation on each rpc.
+ List<ByteBuffer> cellBlock = null;
+ int cellBlockSize = 0;
+ if (this.reservoir != null) {
+ this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec,
+ this.connection.compressionCodec, cells, this.reservoir);
+ if (this.cellBlockStream != null) {
+ cellBlock = this.cellBlockStream.getByteBuffers();
+ cellBlockSize = this.cellBlockStream.size();
+ }
+ } else {
+ ByteBuffer b = this.cellBlockBuilder.buildCellBlock(this.connection.codec,
+ this.connection.compressionCodec, cells);
+ if (b != null) {
+ cellBlockSize = b.remaining();
+ cellBlock = new ArrayList<>(1);
+ cellBlock.add(b);
+ }
+ }
+
+ if (cellBlockSize > 0) {
+ CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
+ // Presumes the cellBlock bytebuffer has been flipped so limit has total size in it.
+ cellBlockBuilder.setLength(cellBlockSize);
+ headerBuilder.setCellBlockMeta(cellBlockBuilder.build());
+ }
+ Message header = headerBuilder.build();
+ ByteBuffer headerBuf =
+ createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
+ ByteBuffer[] responseBufs = null;
+ int cellBlockBufferSize = 0;
+ if (cellBlock != null) {
+ cellBlockBufferSize = cellBlock.size();
+ responseBufs = new ByteBuffer[1 + cellBlockBufferSize];
+ } else {
+ responseBufs = new ByteBuffer[1];
+ }
+ responseBufs[0] = headerBuf;
+ if (cellBlock != null) {
+ for (int i = 0; i < cellBlockBufferSize; i++) {
+ responseBufs[i + 1] = cellBlock.get(i);
+ }
+ }
+ bc = new BufferChain(responseBufs);
+ if (connection.useWrap) {
+ bc = wrapWithSasl(bc);
+ }
+ } catch (IOException e) {
+ RpcServer.LOG.warn("Exception while creating response " + e);
+ }
+ this.response = bc;
+ // Once a response message is created and set to this.response, this Call can be treated as
+ // done. The Responder thread will do the n/w write of this message back to client.
+ if (this.rpcCallback != null) {
+ try {
+ this.rpcCallback.run();
+ } catch (Exception e) {
+ // Don't allow any exception here to kill this handler thread.
+ RpcServer.LOG.warn("Exception while running the Rpc Callback.", e);
+ }
+ }
+ }
+
+ protected void setExceptionResponse(Throwable t, String errorMsg,
+ ResponseHeader.Builder headerBuilder) {
+ ExceptionResponse.Builder exceptionBuilder = ExceptionResponse.newBuilder();
+ exceptionBuilder.setExceptionClassName(t.getClass().getName());
+ exceptionBuilder.setStackTrace(errorMsg);
+ exceptionBuilder.setDoNotRetry(t instanceof DoNotRetryIOException);
+ if (t instanceof RegionMovedException) {
+ // Special casing for this exception. This is only one carrying a payload.
+ // Do this instead of build a generic system for allowing exceptions carry
+ // any kind of payload.
+ RegionMovedException rme = (RegionMovedException)t;
+ exceptionBuilder.setHostname(rme.getHostname());
+ exceptionBuilder.setPort(rme.getPort());
+ }
+ // Set the exception as the result of the method invocation.
+ headerBuilder.setException(exceptionBuilder.build());
+ }
+
+ protected ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+ int cellBlockSize, List<ByteBuffer> cellBlock) throws IOException {
+ // Organize the response as a set of bytebuffers rather than collect it all together inside
+ // one big byte array; save on allocations.
+ // for writing the header, we check if there is available space in the buffers
+ // created for the cellblock itself. If there is space for the header, we reuse
+ // the last buffer in the cellblock. This applies to the cellblock created from the
+ // pool or even the onheap cellblock buffer in case there is no pool enabled.
+ // Possible reuse would avoid creating a temporary array for storing the header every time.
+ ByteBuffer possiblePBBuf =
+ (cellBlockSize > 0) ? cellBlock.get(cellBlock.size() - 1) : null;
+ int headerSerializedSize = 0, resultSerializedSize = 0, headerVintSize = 0,
+ resultVintSize = 0;
+ if (header != null) {
+ headerSerializedSize = header.getSerializedSize();
+ headerVintSize = CodedOutputStream.computeUInt32SizeNoTag(headerSerializedSize);
+ }
+ if (result != null) {
+ resultSerializedSize = result.getSerializedSize();
+ resultVintSize = CodedOutputStream.computeUInt32SizeNoTag(resultSerializedSize);
+ }
+ // calculate the total size
+ int totalSize = headerSerializedSize + headerVintSize
+ + (resultSerializedSize + resultVintSize)
+ + cellBlockSize;
+ int totalPBSize = headerSerializedSize + headerVintSize + resultSerializedSize
+ + resultVintSize + Bytes.SIZEOF_INT;
+ // Only if the last buffer has enough space for header use it. Else allocate
+ // a new buffer. Assume they are all flipped
+ if (possiblePBBuf != null
+ && possiblePBBuf.limit() + totalPBSize <= possiblePBBuf.capacity()) {
+ // duplicate the buffer. This is where the header is going to be written
+ ByteBuffer pbBuf = possiblePBBuf.duplicate();
+ // get the current limit
+ int limit = pbBuf.limit();
+ // Position such that we write the header to the end of the buffer
+ pbBuf.position(limit);
+ // limit to the header size
+ pbBuf.limit(totalPBSize + limit);
+ // mark the current position
+ pbBuf.mark();
+ writeToCOS(result, header, totalSize, pbBuf);
+ // reset the buffer back to old position
+ pbBuf.reset();
+ return pbBuf;
+ } else {
+ return createHeaderAndMessageBytes(result, header, totalSize, totalPBSize);
+ }
+ }
+
+ private void writeToCOS(Message result, Message header, int totalSize, ByteBuffer pbBuf)
+ throws IOException {
+ ByteBufferUtils.putInt(pbBuf, totalSize);
+ // create COS that works on BB
+ CodedOutputStream cos = CodedOutputStream.newInstance(pbBuf);
+ if (header != null) {
+ cos.writeMessageNoTag(header);
+ }
+ if (result != null) {
+ cos.writeMessageNoTag(result);
+ }
+ cos.flush();
+ cos.checkNoSpaceLeft();
+ }
+
+ private ByteBuffer createHeaderAndMessageBytes(Message result, Message header,
+ int totalSize, int totalPBSize) throws IOException {
+ ByteBuffer pbBuf = ByteBuffer.allocate(totalPBSize);
+ writeToCOS(result, header, totalSize, pbBuf);
+ pbBuf.flip();
+ return pbBuf;
+ }
+
+ protected BufferChain wrapWithSasl(BufferChain bc)
+ throws IOException {
+ if (!this.connection.useSasl) return bc;
+ // Looks like no way around this; saslserver wants a byte array. I have to make it one.
+ // THIS IS A BIG UGLY COPY.
+ byte [] responseBytes = bc.getBytes();
+ byte [] token;
+ // synchronization may be needed since there can be multiple Handler
+ // threads using saslServer or Crypto AES to wrap responses.
+ if (connection.useCryptoAesWrap) {
+ // wrap with Crypto AES
+ synchronized (connection.cryptoAES) {
+ token = connection.cryptoAES.wrap(responseBytes, 0, responseBytes.length);
+ }
+ } else {
+ synchronized (connection.saslServer) {
+ token = connection.saslServer.wrap(responseBytes, 0, responseBytes.length);
+ }
+ }
+ if (RpcServer.LOG.isTraceEnabled()) {
+ RpcServer.LOG.trace("Adding saslServer wrapped token of size " + token.length
+ + " as call response.");
+ }
+
+ ByteBuffer[] responseBufs = new ByteBuffer[2];
+ responseBufs[0] = ByteBuffer.wrap(Bytes.toBytes(token.length));
+ responseBufs[1] = ByteBuffer.wrap(token);
+ return new BufferChain(responseBufs);
+ }
+
+ @Override
+ public long disconnectSince() {
+ if (!this.connection.isConnectionOpen()) {
+ return System.currentTimeMillis() - receiveTime;
+ } else {
+ return -1L;
+ }
+ }
+
+ @Override
+ public boolean isClientCellBlockSupported() {
+ return this.connection != null && this.connection.codec != null;
+ }
+
+ @Override
+ public long getResponseCellSize() {
+ return responseCellSize;
+ }
+
+ @Override
+ public void incrementResponseCellSize(long cellSize) {
+ responseCellSize += cellSize;
+ }
+
+ @Override
+ public long getResponseBlockSize() {
+ return responseBlockSize;
+ }
+
+ @Override
+ public void incrementResponseBlockSize(long blockSize) {
+ responseBlockSize += blockSize;
+ }
+
+ @Override
+ public long getResponseExceptionSize() {
+ return exceptionSize;
+ }
+ @Override
+ public void incrementResponseExceptionSize(long exSize) {
+ exceptionSize += exSize;
+ }
+
+ @Override
+ public long getSize() {
+ return this.size;
+ }
+
+ @Override
+ public long getDeadline() {
+ return deadline;
+ }
+
+ @Override
+ public User getRequestUser() {
+ return user;
+ }
+
+ @Override
+ public String getRequestUserName() {
+ User user = getRequestUser();
+ return user == null? null: user.getShortName();
+ }
+
+ @Override
+ public InetAddress getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ @Override
+ public VersionInfo getClientVersionInfo() {
+ return connection.getVersionInfo();
+ }
+
+ @Override
+ public synchronized void setCallBack(RpcCallback callback) {
+ this.rpcCallback = callback;
+ }
+
+ @Override
+ public boolean isRetryImmediatelySupported() {
+ return retryImmediatelySupported;
+ }
+
+ @Override
+ public BlockingService getService() {
+ return service;
+ }
+
+ @Override
+ public MethodDescriptor getMethod() {
+ return md;
+ }
+
+ @Override
+ public Message getParam() {
+ return param;
+ }
+
+ @Override
+ public CellScanner getCellScanner() {
+ return cellScanner;
+ }
+
+ @Override
+ public long getReceiveTime() {
+ return receiveTime;
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public void setStartTime(long t) {
+ this.startTime = t;
+ }
+
+ @Override
+ public int getTimeout() {
+ return timeout;
+ }
+
+ @Override
+ public int getRemotePort() {
+ return connection.getRemotePort();
+ }
+
+ @Override
+ public TraceInfo getTraceInfo() {
+ return tinfo;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
index f771eec..59d1ff9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java
@@ -127,61 +127,6 @@ public class SimpleRpcServer extends RpcServer {
private Listener listener = null;
protected Responder responder = null;
- /**
- * Datastructure that holds all necessary to a method invocation and then afterward, carries
- * the result.
- */
- @InterfaceStability.Evolving
- public class Call extends RpcServer.Call {
-
- protected Responder responder;
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
- justification="Can't figure why this complaint is happening... see below")
- Call(int id, final BlockingService service, final MethodDescriptor md,
- RequestHeader header, Message param, CellScanner cellScanner,
- RpcServer.Connection connection, long size, TraceInfo tinfo,
- final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup,
- Responder responder) {
- super(id, service, md, header, param, cellScanner, connection, size,
- tinfo, remoteAddress, timeout, reqCleanup);
- this.responder = responder;
- }
-
- /**
- * Call is done. Execution happened and we returned results to client. It is now safe to
- * cleanup.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC",
- justification="Presume the lock on processing request held by caller is protection enough")
- @Override
- void done() {
- super.done();
- this.getConnection().decRpcCount(); // Say that we're done with this call.
- }
-
- @Override
- public long disconnectSince() {
- if (!getConnection().isConnectionOpen()) {
- return System.currentTimeMillis() - timestamp;
- } else {
- return -1L;
- }
- }
-
- @Override
- public synchronized void sendResponseIfReady() throws IOException {
- // set param null to reduce memory pressure
- this.param = null;
- this.responder.doRespond(this);
- }
-
- Connection getConnection() {
- return (Connection) this.connection;
- }
-
- }
-
/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
@@ -589,8 +534,8 @@ public class SimpleRpcServer extends RpcServer {
if (connection == null) {
throw new IllegalStateException("Coding error: SelectionKey key without attachment.");
}
- Call call = connection.responseQueue.peekFirst();
- if (call != null && now > call.timestamp + purgeTimeout) {
+ SimpleServerCall call = connection.responseQueue.peekFirst();
+ if (call != null && now > call.lastSentTime + purgeTimeout) {
conWithOldCalls.add(call.getConnection());
}
}
@@ -637,7 +582,7 @@ public class SimpleRpcServer extends RpcServer {
* @return true if we proceed the call fully, false otherwise.
* @throws IOException
*/
- private boolean processResponse(final Call call) throws IOException {
+ private boolean processResponse(final SimpleServerCall call) throws IOException {
boolean error = true;
try {
// Send as much data as we can in the non-blocking fashion
@@ -680,7 +625,7 @@ public class SimpleRpcServer extends RpcServer {
try {
for (int i = 0; i < 20; i++) {
// protection if some handlers manage to need all the responder
- Call call = connection.responseQueue.pollFirst();
+ SimpleServerCall call = connection.responseQueue.pollFirst();
if (call == null) {
return true;
}
@@ -699,7 +644,7 @@ public class SimpleRpcServer extends RpcServer {
//
// Enqueue a response from the application.
//
- void doRespond(Call call) throws IOException {
+ void doRespond(SimpleServerCall call) throws IOException {
boolean added = false;
// If there is already a write in progress, we don't wait. This allows to free the handlers
@@ -728,7 +673,7 @@ public class SimpleRpcServer extends RpcServer {
call.responder.registerForWrite(call.getConnection());
// set the serve time when the response has to be sent later
- call.timestamp = System.currentTimeMillis();
+ call.lastSentTime = System.currentTimeMillis();
}
}
@@ -741,7 +686,7 @@ public class SimpleRpcServer extends RpcServer {
protected SocketChannel channel;
private ByteBuff data;
private ByteBuffer dataLengthBuffer;
- protected final ConcurrentLinkedDeque<Call> responseQueue = new ConcurrentLinkedDeque<>();
+ protected final ConcurrentLinkedDeque<SimpleServerCall> responseQueue = new ConcurrentLinkedDeque<>();
private final Lock responseWriteLock = new ReentrantLock();
private LongAdder rpcCount = new LongAdder(); // number of outstanding rpcs
private long lastContact;
@@ -769,13 +714,14 @@ public class SimpleRpcServer extends RpcServer {
socketSendBufferSize);
}
}
- this.saslCall = new Call(SASL_CALLID, null, null, null, null, null, this,
- 0, null, null, 0, null, responder);
- this.setConnectionHeaderResponseCall = new Call(
- CONNECTION_HEADER_RESPONSE_CALLID, null, null, null, null, null,
- this, 0, null, null, 0, null, responder);
- this.authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, null, null,
- null, null, null, this, 0, null, null, 0, null, responder);
+ this.saslCall = new SimpleServerCall(SASL_CALLID, null, null, null, null, null, this, 0, null,
+ null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
+ this.setConnectionHeaderResponseCall = new SimpleServerCall(CONNECTION_HEADER_RESPONSE_CALLID,
+ null, null, null, null, null, this, 0, null, null, System.currentTimeMillis(), 0,
+ reservoir, cellBlockBuilder, null, responder);
+ this.authFailedCall = new SimpleServerCall(AUTHORIZATION_FAILED_CALLID, null, null, null,
+ null, null, this, 0, null, null, System.currentTimeMillis(), 0, reservoir,
+ cellBlockBuilder, null, responder);
}
public void setLastContact(long lastContact) {
@@ -941,8 +887,9 @@ public class SimpleRpcServer extends RpcServer {
RequestHeader header = (RequestHeader) builder.build();
// Notify the client about the offending request
- Call reqTooBig = new Call(header.getCallId(), this.service, null,
- null, null, null, this, 0, null, this.addr, 0, null, responder);
+ SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service,
+ null, null, null, null, this, 0, null, this.addr, System.currentTimeMillis(), 0,
+ reservoir, cellBlockBuilder, null, responder);
metrics.exception(REQUEST_TOO_BIG_EXCEPTION);
// Make sure the client recognizes the underlying exception
// Otherwise, throw a DoNotRetryIOException.
@@ -1043,8 +990,8 @@ public class SimpleRpcServer extends RpcServer {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg);
- Call fakeCall = new Call(-1, null, null, null, null, null, this, -1,
- null, null, 0, null, responder);
+ SimpleServerCall fakeCall = new SimpleServerCall(-1, null, null, null, null, null, this, -1,
+ null, null, System.currentTimeMillis(), 0, reservoir, cellBlockBuilder, null, responder);
setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall);
// Returning -1 closes out the connection.
@@ -1081,13 +1028,13 @@ public class SimpleRpcServer extends RpcServer {
}
@Override
- public RpcServer.Call createCall(int id, final BlockingService service,
- final MethodDescriptor md, RequestHeader header, Message param,
- CellScanner cellScanner, RpcServer.Connection connection, long size,
- TraceInfo tinfo, final InetAddress remoteAddress, int timeout,
- CallCleanup reqCleanup) {
- return new Call(id, service, md, header, param, cellScanner, connection,
- size, tinfo, remoteAddress, timeout, reqCleanup, responder);
+ public ServerCall createCall(int id, final BlockingService service, final MethodDescriptor md,
+ RequestHeader header, Message param, CellScanner cellScanner,
+ RpcServer.Connection connection, long size, TraceInfo tinfo,
+ final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) {
+ return new SimpleServerCall(id, service, md, header, param, cellScanner, connection, size,
+ tinfo, remoteAddress, System.currentTimeMillis(), timeout, reservoir, cellBlockBuilder,
+ reqCleanup, responder);
}
}
@@ -1206,17 +1153,16 @@ public class SimpleRpcServer extends RpcServer {
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
- return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
+ return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),
+ 0);
}
@Override
- public Pair<Message, CellScanner> call(BlockingService service,
- MethodDescriptor md, Message param, CellScanner cellScanner,
- long receiveTime, MonitoredRPCHandler status, long startTime, int timeout)
- throws IOException {
- Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null,
- -1, null, null, timeout, null, null);
- fakeCall.setReceiveTime(receiveTime);
+ public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
+ Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
+ long startTime, int timeout) throws IOException {
+ SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner,
+ null, -1, null, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null);
return call(fakeCall, status);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
new file mode 100644
index 0000000..b82d348
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.io.ByteBufferPool;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Connection;
+import org.apache.hadoop.hbase.ipc.SimpleRpcServer.Responder;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * Datastructure that holds all necessary to a method invocation and then afterward, carries the
+ * result.
+ */
+@InterfaceAudience.Private
+class SimpleServerCall extends ServerCall {
+
+ long lastSentTime;
+
+ final Responder responder;
+
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
+ justification = "Can't figure why this complaint is happening... see below")
+ SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md,
+ RequestHeader header, Message param, CellScanner cellScanner, RpcServer.Connection connection,
+ long size, TraceInfo tinfo, final InetAddress remoteAddress, long receiveTime, int timeout,
+ ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup,
+ Responder responder) {
+ super(id, service, md, header, param, cellScanner, connection, size, tinfo, remoteAddress,
+ receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup);
+ this.responder = responder;
+ }
+
+ /**
+ * Call is done. Execution happened and we returned results to client. It is now safe to cleanup.
+ */
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",
+ justification = "Presume the lock on processing request held by caller is protection enough")
+ @Override
+ void done() {
+ super.done();
+ this.getConnection().decRpcCount(); // Say that we're done with this call.
+ }
+
+ @Override
+ public synchronized void sendResponseIfReady() throws IOException {
+ // set param null to reduce memory pressure
+ this.param = null;
+ this.responder.doRespond(this);
+ }
+
+ Connection getConnection() {
+ return (Connection) this.connection;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
index 47c15ae..a4cc233 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestCallRunner.java
@@ -33,8 +33,7 @@ public class TestCallRunner {
public void testSimpleCall() {
RpcServerInterface mockRpcServer = Mockito.mock(RpcServerInterface.class);
Mockito.when(mockRpcServer.isStarted()).thenReturn(true);
- RpcServer.Call mockCall = Mockito.mock(RpcServer.Call.class);
- mockCall.connection = Mockito.mock(RpcServer.Connection.class);
+ ServerCall mockCall = Mockito.mock(ServerCall.class);
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.run();
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 66b77cd..1d7c12e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -30,6 +30,11 @@ import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
@@ -44,18 +49,16 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CategoryBasedTimeout;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -63,18 +66,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestRule;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
@Category({RPCTests.class, SmallTests.class})
public class TestSimpleRpcScheduler {/*
@Rule
@@ -167,7 +163,7 @@ public class TestSimpleRpcScheduler {/*
}
private CallRunner createMockTask() {
- Call call = mock(Call.class);
+ ServerCall call = mock(ServerCall.class);
CallRunner task = mock(CallRunner.class);
when(task.getRpcCall()).thenReturn(call);
return task;
@@ -195,19 +191,19 @@ public class TestSimpleRpcScheduler {/*
scheduler.start();
CallRunner smallCallTask = mock(CallRunner.class);
- RpcServer.Call smallCall = mock(RpcServer.Call.class);
+ ServerCall smallCall = mock(ServerCall.class);
RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
when(smallCallTask.getRpcCall()).thenReturn(smallCall);
when(smallCall.getHeader()).thenReturn(smallHead);
CallRunner largeCallTask = mock(CallRunner.class);
- RpcServer.Call largeCall = mock(RpcServer.Call.class);
+ ServerCall largeCall = mock(ServerCall.class);
RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
when(largeCallTask.getRpcCall()).thenReturn(largeCall);
when(largeCall.getHeader()).thenReturn(largeHead);
CallRunner hugeCallTask = mock(CallRunner.class);
- RpcServer.Call hugeCall = mock(RpcServer.Call.class);
+ ServerCall hugeCall = mock(ServerCall.class);
RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
when(hugeCall.getHeader()).thenReturn(hugeHead);
@@ -290,7 +286,7 @@ public class TestSimpleRpcScheduler {/*
scheduler.start();
CallRunner putCallTask = mock(CallRunner.class);
- RpcServer.Call putCall = mock(RpcServer.Call.class);
+ ServerCall putCall = mock(ServerCall.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
@@ -299,13 +295,13 @@ public class TestSimpleRpcScheduler {/*
when(putCall.getParam()).thenReturn(putCall.param);
CallRunner getCallTask = mock(CallRunner.class);
- RpcServer.Call getCall = mock(RpcServer.Call.class);
+ ServerCall getCall = mock(ServerCall.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);
CallRunner scanCallTask = mock(CallRunner.class);
- RpcServer.Call scanCall = mock(RpcServer.Call.class);
+ ServerCall scanCall = mock(ServerCall.class);
scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
@@ -382,7 +378,7 @@ public class TestSimpleRpcScheduler {/*
scheduler.start();
CallRunner putCallTask = mock(CallRunner.class);
- RpcServer.Call putCall = mock(RpcServer.Call.class);
+ ServerCall putCall = mock(ServerCall.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
@@ -506,19 +502,15 @@ public class TestSimpleRpcScheduler {/*
// Get mocked call that has the CallRunner sleep for a while so that the fast
// path isn't hit.
private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
- final RpcServer.Call putCall = mock(RpcServer.Call.class);
-
- putCall.timestamp = timestamp;
- putCall.param = RequestConverter.buildMutateRequest(
- Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+ ServerCall putCall = new ServerCall(1, null, null,
+ RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(),
+ RequestConverter.buildMutateRequest(Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))),
+ null, null, 9, null, null, timestamp, 0, null, null, null) {
- RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder()
- .setMethodName("mutate")
- .build();
- when(putCall.getSize()).thenReturn(9L);
- when(putCall.getHeader()).thenReturn(putHead);
- when(putCall.getReceiveTime()).thenReturn(putCall.timestamp);
- when(putCall.getParam()).thenReturn(putCall.param);
+ @Override
+ public void sendResponseIfReady() throws IOException {
+ }
+ };
CallRunner cr = new CallRunner(null, putCall) {
public void run() {
@@ -530,11 +522,13 @@ public class TestSimpleRpcScheduler {/*
} catch (InterruptedException e) {
}
}
+
public RpcCall getRpcCall() {
return putCall;
}
- public void drop() {}
+ public void drop() {
+ }
};
return cr;
http://git-wip-us.apache.org/repos/asf/hbase/blob/51d4c68b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
index 85a14f2..a31b7b6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/TestSecureIPC.java
@@ -28,6 +28,8 @@ import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
+
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -57,8 +59,8 @@ import org.apache.hadoop.hbase.ipc.SimpleRpcServer;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -75,10 +77,8 @@ import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.Mockito;
-import com.google.common.collect.Lists;
-
@RunWith(Parameterized.class)
-@Category({ SecurityTests.class, SmallTests.class })
+@Category({ SecurityTests.class, MediumTests.class })
public class TestSecureIPC {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();