You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/08/10 08:57:06 UTC
hbase git commit: HBASE-16285 Drop RPC requests if it must be
considered as timeout at client
Repository: hbase
Updated Branches:
refs/heads/master 2d203e605 -> ede9940a7
HBASE-16285 Drop RPC requests if it must be considered as timeout at client
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ede9940a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ede9940a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ede9940a
Branch: refs/heads/master
Commit: ede9940a7bd254d496b2ef493d4f35540184e96a
Parents: 2d203e6
Author: Phil Yang <ud...@gmail.com>
Authored: Thu Aug 4 15:38:38 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Aug 10 16:56:27 2016 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 7 +++-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 16 ++++---
.../hadoop/hbase/ipc/RpcServerInterface.java | 11 +++--
.../org/apache/hadoop/hbase/client/TestHCM.java | 44 +++++++++++++++++++-
4 files changed, 62 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ede9940a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index e91699a..b2b3c66 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -93,6 +93,11 @@ public class CallRunner {
}
return;
}
+ call.startTime = System.currentTimeMillis();
+ if (call.startTime > call.deadline) {
+ RpcServer.LOG.info("Dropping timed out call: " + call);
+ return;
+ }
this.status.setStatus("Setting up call");
this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
if (RpcServer.LOG.isTraceEnabled()) {
@@ -116,7 +121,7 @@ public class CallRunner {
}
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
- call.timestamp, this.status, call.timeout);
+ call.timestamp, this.status, call.startTime, call.timeout);
} catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
http://git-wip-us.apache.org/repos/asf/hbase/blob/ede9940a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 73226aa..5b2aab1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -75,12 +75,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
-import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
@@ -99,7 +96,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.AuthMethod;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
@@ -312,6 +308,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected long timestamp; // the time received when response is null
// the time served when response is not null
protected int timeout;
+ protected long startTime;
+ protected long deadline;// the deadline to handle this call, if exceed we can drop it.
+
/**
* Chain of buffers to send as response.
*/
@@ -354,6 +353,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.retryImmediatelySupported =
connection == null? null: connection.retryImmediatelySupported;
this.timeout = timeout;
+ this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
}
/**
@@ -1894,7 +1894,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
: null;
int timeout = 0;
- if (header.hasTimeout()){
+ if (header.hasTimeout() && header.getTimeout() > 0){
timeout = Math.max(minClientRequestTimeout, header.getTimeout());
}
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
@@ -2187,7 +2187,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException {
- return call(service, md, param, cellScanner, receiveTime, status, 0);
+ return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
}
/**
@@ -2195,10 +2195,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
* the return response has protobuf response payload. On failure, the
* exception name and the stack trace are returned in the protobuf response.
*/
- @Override
public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
- int timeout)
+ long startTime, int timeout)
throws IOException {
try {
status.setRPC(md.getName(), new Object[]{param}, receiveTime);
@@ -2206,7 +2205,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
status.setRPCPacket(param);
status.resume("Servicing call");
//get an instance of the method arg type
- long startTime = System.currentTimeMillis();
PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
controller.setCallTimeout(timeout);
Message result = service.callBlockingMethod(md, controller, param);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ede9940a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index dd7e584..0388ea4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -48,14 +48,17 @@ public interface RpcServerInterface {
void setSocketSendBufSize(int size);
InetSocketAddress getListenerAddress();
+ /**
+ * @deprecated As of release 1.3, this will be removed in HBase 3.0
+ */
+ @Deprecated
Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
throws IOException, ServiceException;
- Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
- Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
- int timeout)
- throws IOException, ServiceException;
+ Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message param,
+ CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
+ int timeout) throws IOException, ServiceException;
void setErrorHandler(HBaseRPCErrorHandler handler);
HBaseRPCErrorHandler getErrorHandler();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ede9940a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 4d47bde..bfd16a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -78,7 +79,11 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestRule;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
/**
* This class is for testing HBaseConnectionManager features
@@ -105,6 +110,7 @@ public class TestHCM {
private static final byte[] ROW = Bytes.toBytes("bbb");
private static final byte[] ROW_X = Bytes.toBytes("xxx");
private static Random _randy = new Random();
+ private static final int RPC_RETRY = 5;
/**
* This copro sleeps 20 second. The first call it fails. The second time, it works.
@@ -155,12 +161,31 @@ public class TestHCM {
}
}
+ public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver {
+ public static final int SLEEP_TIME = 2000;
+ static final AtomicLong ct = new AtomicLong(0);
+ @Override
+ public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Get get, final List<Cell> results) throws IOException {
+ // After first sleep, all requests are timeout except the last retry. If we handle
+ // all the following requests, finally the last request is also timeout. If we drop all
+ // timeout requests, we can handle the last request immediately and it will not timeout.
+ if (ct.incrementAndGet() <= 1) {
+ Threads.sleep(SLEEP_TIME * RPC_RETRY * 2);
+ } else {
+ Threads.sleep(SLEEP_TIME);
+ }
+ }
+ }
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
// Up the handlers; this test needs more than usual.
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10);
- TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
+ // simulate queue blocking in testDropTimeoutRequest
+ TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
TEST_UTIL.startMiniCluster(2);
}
@@ -442,6 +467,21 @@ public class TestHCM {
}
}
+ @Test
+ public void testDropTimeoutRequest() throws Exception {
+ // Simulate the situation that the server is slow and client retries for several times because
+ // of timeout. When a request can be handled after waiting in the queue, we will drop it if
+ // it has been considered as timeout at client. If we don't drop it, the server will waste time
+ // on handling timeout requests and finally all requests timeout and client throws exception.
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDropTimeputRequest");
+ hdt.addCoprocessor(SleepLongerAtFirstCoprocessor.class.getName());
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+ t.setRpcTimeout(SleepLongerAtFirstCoprocessor.SLEEP_TIME * 2);
+ t.get(new Get(FAM_NAM));
+ }
+ }
+
/**
* Test starting from 0 index when RpcRetryingCaller calculate the backoff time.
*/