You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by je...@apache.org on 2016/12/05 18:26:36 UTC
hbase git commit: HBASE-17221 Abstract out an interface for
RpcServer.Call
Repository: hbase
Updated Branches:
refs/heads/master 1c8822ddf -> 39653862a
HBASE-17221 Abstract out an interface for RpcServer.Call
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/39653862
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/39653862
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/39653862
Branch: refs/heads/master
Commit: 39653862a4d24b5309e972ca38554a7f81bc94fd
Parents: 1c8822d
Author: Jerry He <je...@apache.org>
Authored: Mon Dec 5 10:21:55 2016 -0800
Committer: Jerry He <je...@apache.org>
Committed: Mon Dec 5 10:21:55 2016 -0800
----------------------------------------------------------------------
.../hbase/ipc/AdaptiveLifoCoDelCallQueue.java | 2 +-
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 46 +++---
.../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 7 +-
.../org/apache/hadoop/hbase/ipc/RpcCall.java | 146 +++++++++++++++++++
.../apache/hadoop/hbase/ipc/RpcCallContext.java | 14 +-
.../apache/hadoop/hbase/ipc/RpcExecutor.java | 12 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 138 +++++++++++++-----
.../hadoop/hbase/ipc/RpcServerInterface.java | 11 +-
.../hadoop/hbase/ipc/SimpleRpcScheduler.java | 4 +-
.../hadoop/hbase/TestMetaTableAccessor.java | 2 +-
.../hbase/ipc/TestSimpleRpcScheduler.java | 27 ++--
11 files changed, 323 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 42b500f..82b8f1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -161,7 +161,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
*/
private boolean needToDrop(CallRunner callRunner) {
long now = EnvironmentEdgeManager.currentTime();
- long callDelay = now - callRunner.getCall().timestamp;
+ long callDelay = now - callRunner.getRpcCall().getReceiveTime();
long localMinDelay = this.minDelay;
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index d570b17..5301a67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -25,16 +25,14 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
-import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-
/**
* The request processing logic, which is usually executed in thread pools provided by an
* {@link RpcScheduler}. Call {@link #run()} to actually execute the contained
@@ -47,7 +45,7 @@ public class CallRunner {
private static final CallDroppedException CALL_DROPPED_EXCEPTION
= new CallDroppedException();
- private Call call;
+ private RpcCall call;
private RpcServerInterface rpcServer;
private MonitoredRPCHandler status;
private volatile boolean sucessful;
@@ -58,7 +56,7 @@ public class CallRunner {
* time we occupy heap.
*/
// The constructor is shutdown so only RpcServer in this class can make one of these.
- CallRunner(final RpcServerInterface rpcServer, final Call call) {
+ CallRunner(final RpcServerInterface rpcServer, final RpcCall call) {
this.call = call;
this.rpcServer = rpcServer;
// Add size of the call to queue size.
@@ -67,10 +65,19 @@ public class CallRunner {
}
}
- public Call getCall() {
+ public RpcCall getRpcCall() {
return call;
}
+ /**
+ * Keep for backward compatibility.
+ * @deprecated As of release 2.0, this will be removed in HBase 3.0
+ */
+ @Deprecated
+ public RpcServer.Call getCall() {
+ return (RpcServer.Call) call;
+ }
+
public void setStatus(MonitoredRPCHandler status) {
this.status = status;
}
@@ -85,23 +92,23 @@ public class CallRunner {
public void run() {
try {
- if (!call.connection.channel.isOpen()) {
+ if (call.disconnectSince() >= 0) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
}
return;
}
- call.startTime = System.currentTimeMillis();
- if (call.startTime > call.deadline) {
+ call.setStartTime(System.currentTimeMillis());
+ if (call.getStartTime() > call.getDeadline()) {
RpcServer.LOG.warn("Dropping timed out call: " + call);
return;
}
this.status.setStatus("Setting up call");
- this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
+ this.status.setConnection(call.getRemoteAddress().getHostAddress(), call.getRemotePort());
if (RpcServer.LOG.isTraceEnabled()) {
- UserGroupInformation remoteUser = call.connection.ugi;
+ User remoteUser = call.getRequestUser();
RpcServer.LOG.trace(call.toShortString() + " executing as " +
- ((remoteUser == null) ? "NULL principal" : remoteUser.getUserName()));
+ ((remoteUser == null) ? "NULL principal" : remoteUser.getName()));
}
Throwable errorThrowable = null;
String error = null;
@@ -114,12 +121,15 @@ public class CallRunner {
throw new ServerNotRunningYetException("Server " +
(address != null ? address : "(channel closed)") + " is not running yet");
}
- if (call.tinfo != null) {
- traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
+ if (call.getTraceInfo() != null) {
+ String serviceName =
+ call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
+ String methodName = (call.getMethod() != null) ? call.getMethod().getName() : "";
+ String traceString = serviceName + "." + methodName;
+ traceScope = Trace.startSpan(traceString, call.getTraceInfo());
}
// make the call
- resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
- call.timestamp, this.status, call.startTime, call.timeout);
+ resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
return;
@@ -181,7 +191,7 @@ public class CallRunner {
*/
public void drop() {
try {
- if (!call.connection.channel.isOpen()) {
+ if (call.disconnectSince() >= 0) {
if (RpcServer.LOG.isDebugEnabled()) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": skipped " + call);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
index 6005900..8637f79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
@@ -129,11 +130,11 @@ public class RWQueueRpcExecutor extends RpcExecutor {
@Override
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
- RpcServer.Call call = callTask.getCall();
+ RpcCall call = callTask.getRpcCall();
int queueIndex;
- if (isWriteRequest(call.getHeader(), call.param)) {
+ if (isWriteRequest(call.getHeader(), call.getParam())) {
queueIndex = writeBalancer.getNextQueue();
- } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.param)) {
+ } else if (numScanQueues > 0 && isScanRequest(call.getHeader(), call.getParam())) {
queueIndex = numWriteQueues + numReadQueues + scanBalancer.getNextQueue();
} else {
queueIndex = numWriteQueues + readBalancer.getNextQueue();
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
new file mode 100644
index 0000000..239ea9e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCall.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingService;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Descriptors.MethodDescriptor;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.htrace.TraceInfo;
+
+/**
+ * Interface of all necessary to carry out a RPC method invocation on the server.
+ */
+@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
+@InterfaceStability.Evolving
+public interface RpcCall extends RpcCallContext {
+
+ /**
+ * @return The service of this call.
+ */
+ BlockingService getService();
+
+ /**
+ * @return The service method.
+ */
+ MethodDescriptor getMethod();
+
+ /**
+ * @return The call parameter message.
+ */
+ Message getParam();
+
+ /**
+ * @return The CellScanner that can carry input and result payload.
+ */
+ CellScanner getCellScanner();
+
+ /**
+ * @return The timestamp when the call is constructed.
+ */
+ long getReceiveTime();
+
+ /**
+ * Set the timestamp when the call is constructed.
+ */
+ void setReceiveTime(long receiveTime);
+
+ /**
+ * @return The time when the call starts to be executed.
+ */
+ long getStartTime();
+
+ /**
+ * Set the time when the call starts to be executed.
+ */
+ void setStartTime(long startTime);
+
+ /**
+ * @return The timeout of this call.
+ */
+ int getTimeout();
+
+ /**
+ * @return The Priority of this call.
+ */
+ int getPriority();
+
+ /**
+ * Return the deadline of this call. If we can not complete this call in time,
+ * we can throw a TimeoutIOException and RPCServer will drop it.
+ * @return The system timestamp of deadline.
+ */
+ long getDeadline();
+
+ /**
+ * Used to calculate the request call queue size.
+ * If the total request call size exceeds a limit, the call will be rejected.
+ * @return The raw size of this call.
+ */
+ long getSize();
+
+ /**
+ * @return The request header of this call.
+ */
+ RequestHeader getHeader();
+
+ /**
+ * @return Port of remote address in this call
+ */
+ int getRemotePort();
+
+ /**
+ * Set the response resulting from this RPC call.
+ * @param param The result message as response.
+ * @param cells The CellScanner that possibly carries the payload.
+ * @param errorThrowable The error Throwable resulting from the call.
+ * @param error Extra error message.
+ */
+ void setResponse(Message param, CellScanner cells, Throwable errorThrowable, String error);
+
+ /**
+ * Send the response of this RPC call.
+ * Implementation provides the underlying facility (connection, etc) to send.
+ * @throws IOException
+ */
+ void sendResponseIfReady() throws IOException;
+
+ /**
+ * Do the necessary cleanup after the call if needed.
+ */
+ void cleanup();
+
+ /**
+ * @return A short string format of this call without possibly lengthy params
+ */
+ String toShortString();
+
+ /**
+ * @return TraceInfo attached to this call.
+ */
+ TraceInfo getTraceInfo();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index 9bc8ee7..d2fd557 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.security.User;
+/**
+ * Interface of all necessary to carry out a RPC service invocation on the server. This interface
+ * focus on the information needed or obtained during the actual execution of the service method.
+ */
@InterfaceAudience.Private
public interface RpcCallContext {
/**
@@ -56,7 +60,7 @@ public interface RpcCallContext {
String getRequestUserName();
/**
- * @return Address of remote client if a request is ongoing, else null
+ * @return Address of remote client in this call
*/
InetAddress getRemoteAddress();
@@ -92,12 +96,6 @@ public interface RpcCallContext {
void incrementResponseCellSize(long cellSize);
long getResponseBlockSize();
- void incrementResponseBlockSize(long blockSize);
- /**
- * Return the deadline of this call. If we can not complete this call in time, we can throw a
- * TimeoutIOException and RPCServer will drop it.
- * @return The system timestamp of deadline.
- */
- long getDeadline();
+ void incrementResponseBlockSize(long blockSize);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index e41f4c7..3cb6011 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -361,12 +361,12 @@ public abstract class RpcExecutor {
@Override
public int compare(CallRunner a, CallRunner b) {
- RpcServer.Call callA = a.getCall();
- RpcServer.Call callB = b.getCall();
- long deadlineA = priority.getDeadline(callA.getHeader(), callA.param);
- long deadlineB = priority.getDeadline(callB.getHeader(), callB.param);
- deadlineA = callA.timestamp + Math.min(deadlineA, maxDelay);
- deadlineB = callB.timestamp + Math.min(deadlineB, maxDelay);
+ RpcCall callA = a.getRpcCall();
+ RpcCall callB = b.getRpcCall();
+ long deadlineA = priority.getDeadline(callA.getHeader(), callA.getParam());
+ long deadlineB = priority.getDeadline(callB.getHeader(), callB.getParam());
+ deadlineA = callA.getReceiveTime() + Math.min(deadlineA, maxDelay);
+ deadlineB = callB.getReceiveTime() + Math.min(deadlineB, maxDelay);
return Long.compare(deadlineA, deadlineB);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index ee44c68..8b6379b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -214,7 +214,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
/** This is set to Call object before Handler invokes an RPC and ybdie
* after the call returns.
*/
- protected static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
+ protected static final ThreadLocal<RpcCall> CurCall =
+ new ThreadLocal<RpcCall>();
/** Keeps MonitoredRPCHandler per handler thread. */
static final ThreadLocal<MonitoredRPCHandler> MONITORED_RPC
@@ -326,9 +327,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* Datastructure that holds all necessary to a method invocation and then afterward, carries
* the result.
*/
- @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
- @InterfaceStability.Evolving
- public class Call implements RpcCallContext {
+ @InterfaceAudience.Private
+ public class Call implements RpcCall {
protected int id; // the client's call id
protected BlockingService service;
protected MethodDescriptor md;
@@ -408,7 +408,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.connection.decRpcCount(); // Say that we're done with this call.
}
- protected void cleanup() {
+ @Override
+ public void cleanup() {
if (this.reqCleanup != null) {
this.reqCleanup.run();
this.reqCleanup = null;
@@ -422,7 +423,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
" connection: " + connection.toString();
}
- protected RequestHeader getHeader() {
+ @Override
+ public RequestHeader getHeader() {
return this.header;
}
@@ -430,6 +432,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return this.header.hasPriority();
}
+ @Override
public int getPriority() {
return this.header.getPriority();
}
@@ -438,7 +441,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* Short string representation without param info because param itself could be huge depends on
* the payload of a command
*/
- String toShortString() {
+ @Override
+ public String toShortString() {
String serviceName = this.connection.service != null ?
this.connection.service.getDescriptorForType().getName() : "null";
return "callId: " + this.id + " service: " + serviceName +
@@ -448,13 +452,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
" deadline: " + deadline;
}
- String toTraceString() {
- String serviceName = this.connection.service != null ?
- this.connection.service.getDescriptorForType().getName() : "";
- String methodName = (this.md != null) ? this.md.getName() : "";
- return serviceName + "." + methodName;
- }
-
protected synchronized void setSaslTokenResponse(ByteBuffer response) {
ByteBuffer[] responseBufs = new ByteBuffer[1];
responseBufs[0] = response;
@@ -467,15 +464,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.response = new BufferChain(responseBufs);
}
- protected synchronized void setResponse(Object m, final CellScanner cells,
+ @Override
+ public synchronized void setResponse(Message m, final CellScanner cells,
Throwable t, String errorMsg) {
if (this.isError) return;
if (t != null) this.isError = true;
BufferChain bc = null;
try {
ResponseHeader.Builder headerBuilder = ResponseHeader.newBuilder();
- // Presume it a pb Message. Could be null.
- Message result = (Message)m;
// Call id.
headerBuilder.setCallId(this.id);
if (t != null) {
@@ -511,7 +507,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
Message header = headerBuilder.build();
ByteBuffer headerBuf =
- createHeaderAndMessageBytes(result, header, cellBlockSize, cellBlock);
+ createHeaderAndMessageBytes(m, header, cellBlockSize, cellBlock);
ByteBuffer[] responseBufs = null;
int cellBlockBufferSize = 0;
if (cellBlock != null) {
@@ -681,10 +677,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
- public long getSize() {
- return this.size;
- }
-
@Override
public long getResponseCellSize() {
return responseCellSize;
@@ -706,20 +698,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
@Override
+ public long getSize() {
+ return this.size;
+ }
+
+ @Override
public long getDeadline() {
return deadline;
}
+ @Override
public synchronized void sendResponseIfReady() throws IOException {
// set param null to reduce memory pressure
this.param = null;
this.responder.doRespond(this);
}
- public UserGroupInformation getRemoteUser() {
- return connection.ugi;
- }
-
@Override
public User getRequestUser() {
return user;
@@ -750,6 +744,64 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public boolean isRetryImmediatelySupported() {
return retryImmediatelySupported;
}
+
+ @Override
+ public BlockingService getService() {
+ return service;
+ }
+
+ @Override
+ public MethodDescriptor getMethod() {
+ return md;
+ }
+
+ @Override
+ public Message getParam() {
+ return param;
+ }
+
+ @Override
+ public CellScanner getCellScanner() {
+ return cellScanner;
+ }
+
+ @Override
+ public long getReceiveTime() {
+ return timestamp;
+ }
+
+ @Override
+ public void setReceiveTime(long t) {
+ this.timestamp = t;
+
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public void setStartTime(long t) {
+ this.startTime = t;
+
+ }
+
+ @Override
+ public int getTimeout() {
+ return timeout;
+ }
+
+ @Override
+ public int getRemotePort() {
+ return connection.getRemotePort();
+ }
+
+ @Override
+ public TraceInfo getTraceInfo() {
+ return tinfo;
+ }
+
}
@FunctionalInterface
@@ -2565,24 +2617,37 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
}
+ public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
+ CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
+ int timeout)
+ throws IOException {
+ Call fakeCall = new Call(-1, service, md, null, param, cellScanner, null, null, -1, null, null, timeout,
+ null);
+ fakeCall.setReceiveTime(receiveTime);
+ return call(fakeCall, status);
+ }
+
/**
* This is a server side method, which is invoked over RPC. On success
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
- public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
- Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
- long startTime, int timeout)
+ public Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
throws IOException {
try {
- status.setRPC(md.getName(), new Object[]{param}, receiveTime);
+ MethodDescriptor md = call.getMethod();
+ Message param = call.getParam();
+ status.setRPC(md.getName(), new Object[]{param},
+ call.getReceiveTime());
// TODO: Review after we add in encoded data blocks.
status.setRPCPacket(param);
status.resume("Servicing call");
//get an instance of the method arg type
- HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
- controller.setCallTimeout(timeout);
- Message result = service.callBlockingMethod(md, controller, param);
+ HBaseRpcController controller = new HBaseRpcControllerImpl(call.getCellScanner());
+ controller.setCallTimeout(call.getTimeout());
+ Message result = call.getService().callBlockingMethod(md, controller, param);
+ long receiveTime = call.getReceiveTime();
+ long startTime = call.getStartTime();
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
int qTime = (int) (startTime - receiveTime);
@@ -2596,6 +2661,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
long requestSize = param.getSerializedSize();
long responseSize = result.getSerializedSize();
+
metrics.dequeuedCall(qTime);
metrics.processedCall(processingTime);
metrics.totalCall(totalTime);
@@ -3014,9 +3080,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* @return InetAddress
*/
public static InetAddress getRemoteIp() {
- Call call = CurCall.get();
- if (call != null && call.connection != null && call.connection.socket != null) {
- return call.connection.socket.getInetAddress();
+ RpcCall call = CurCall.get();
+ if (call != null) {
+ return call.getRemoteAddress();
}
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index 5401e3f..a847931 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -55,11 +55,18 @@ public interface RpcServerInterface {
@Deprecated
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
- throws IOException, ServiceException;
+ throws IOException;
+ /**
+ * @deprecated As of release 2.0, this will be removed in HBase 3.0
+ */
+ @Deprecated
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
- int timeout) throws IOException, ServiceException;
+ int timeout) throws IOException;
+
+ Pair<Message, CellScanner> call(RpcCall call, MonitoredRPCHandler status)
+ throws IOException;
void setErrorHandler(HBaseRPCErrorHandler handler);
HBaseRPCErrorHandler getErrorHandler();
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 3aa486e..1f7e8ba 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -157,8 +157,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
@Override
public boolean dispatch(CallRunner callTask) throws InterruptedException {
- RpcServer.Call call = callTask.getCall();
- int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
+ RpcCall call = callTask.getRpcCall();
+ int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser());
if (priorityExecutor != null && level > highPriorityLevel) {
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
index d750faf..e1c19e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestMetaTableAccessor.java
@@ -643,7 +643,7 @@ public class TestMetaTableAccessor {
@Override
public boolean dispatch(CallRunner task) throws IOException, InterruptedException {
- int priority = task.getCall().getPriority();
+ int priority = task.getRpcCall().getPriority();
if (priority > HConstants.QOS_THRESHOLD) {
numPriorityCalls++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/39653862/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index c9c5684..3535d23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -99,6 +99,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testBasic() throws IOException, InterruptedException {
+
PriorityFunction qosFunction = mock(PriorityFunction.class);
RpcScheduler scheduler = new SimpleRpcScheduler(
conf, 10, 0, 0, qosFunction, 0);
@@ -113,6 +114,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testHandlerIsolation() throws IOException, InterruptedException {
+
CallRunner generalTask = createMockTask();
CallRunner priorityTask = createMockTask();
CallRunner replicationTask = createMockTask();
@@ -167,12 +169,13 @@ public class TestSimpleRpcScheduler {/*
private CallRunner createMockTask() {
Call call = mock(Call.class);
CallRunner task = mock(CallRunner.class);
- when(task.getCall()).thenReturn(call);
+ when(task.getRpcCall()).thenReturn(call);
return task;
}
@Test
public void testRpcScheduler() throws Exception {
+
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
}
@@ -194,19 +197,19 @@ public class TestSimpleRpcScheduler {/*
CallRunner smallCallTask = mock(CallRunner.class);
RpcServer.Call smallCall = mock(RpcServer.Call.class);
RequestHeader smallHead = RequestHeader.newBuilder().setCallId(1).build();
- when(smallCallTask.getCall()).thenReturn(smallCall);
+ when(smallCallTask.getRpcCall()).thenReturn(smallCall);
when(smallCall.getHeader()).thenReturn(smallHead);
CallRunner largeCallTask = mock(CallRunner.class);
RpcServer.Call largeCall = mock(RpcServer.Call.class);
RequestHeader largeHead = RequestHeader.newBuilder().setCallId(50).build();
- when(largeCallTask.getCall()).thenReturn(largeCall);
+ when(largeCallTask.getRpcCall()).thenReturn(largeCall);
when(largeCall.getHeader()).thenReturn(largeHead);
CallRunner hugeCallTask = mock(CallRunner.class);
RpcServer.Call hugeCall = mock(RpcServer.Call.class);
RequestHeader hugeHead = RequestHeader.newBuilder().setCallId(100).build();
- when(hugeCallTask.getCall()).thenReturn(hugeCall);
+ when(hugeCallTask.getRpcCall()).thenReturn(hugeCall);
when(hugeCall.getHeader()).thenReturn(hugeHead);
when(priority.getDeadline(eq(smallHead), any(Message.class))).thenReturn(0L);
@@ -255,6 +258,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testScanQueueWithZeroScanRatio() throws Exception {
+
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f);
@@ -290,21 +294,23 @@ public class TestSimpleRpcScheduler {/*
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
- when(putCallTask.getCall()).thenReturn(putCall);
+ when(putCallTask.getRpcCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
+ when(putCall.getParam()).thenReturn(putCall.param);
CallRunner getCallTask = mock(CallRunner.class);
RpcServer.Call getCall = mock(RpcServer.Call.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
- when(getCallTask.getCall()).thenReturn(getCall);
+ when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);
CallRunner scanCallTask = mock(CallRunner.class);
RpcServer.Call scanCall = mock(RpcServer.Call.class);
scanCall.param = ScanRequest.newBuilder().setScannerId(1).build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
- when(scanCallTask.getCall()).thenReturn(scanCall);
+ when(scanCallTask.getRpcCall()).thenReturn(scanCall);
when(scanCall.getHeader()).thenReturn(scanHead);
+ when(scanCall.getParam()).thenReturn(scanCall.param);
ArrayList<Integer> work = new ArrayList<Integer>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
@@ -361,6 +367,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testSoftAndHardQueueLimits() throws Exception {
+
Configuration schedConf = HBaseConfiguration.create();
schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
@@ -379,7 +386,7 @@ public class TestSimpleRpcScheduler {/*
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
- when(putCallTask.getCall()).thenReturn(putCall);
+ when(putCallTask.getRpcCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
assertTrue(scheduler.dispatch(putCallTask));
@@ -512,6 +519,8 @@ public class TestSimpleRpcScheduler {/*
.build();
when(putCall.getSize()).thenReturn(9L);
when(putCall.getHeader()).thenReturn(putHead);
+ when(putCall.getReceiveTime()).thenReturn(putCall.timestamp);
+ when(putCall.getParam()).thenReturn(putCall.param);
CallRunner cr = new CallRunner(null, putCall) {
public void run() {
@@ -523,7 +532,7 @@ public class TestSimpleRpcScheduler {/*
} catch (InterruptedException e) {
}
}
- public Call getCall() {
+ public RpcCall getRpcCall() {
return putCall;
}