You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/09/19 23:17:43 UTC
[30/50] [abbrv] hbase git commit: HBASE-16388 Prevent client threads
being blocked by only one slow region server
HBASE-16388 Prevent client threads being blocked by only one slow region server
Signed-off-by: stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8ef6c763
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8ef6c763
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8ef6c763
Branch: refs/heads/hbase-12439
Commit: 8ef6c76344127f2f4d2f9536d87fa6fc7b5c7132
Parents: 8540171
Author: Phil Yang <ud...@gmail.com>
Authored: Wed Sep 14 13:21:01 2016 +0800
Committer: stack <st...@apache.org>
Committed: Wed Sep 14 09:08:20 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/ipc/AbstractRpcClient.java | 22 ++++
.../hbase/ipc/ServerTooBusyException.java | 38 ++++++
.../org/apache/hadoop/hbase/HConstants.java | 12 ++
.../src/main/resources/hbase-default.xml | 16 ++-
.../org/apache/hadoop/hbase/client/TestHCM.java | 119 ++++++++++++++++++-
5 files changed, 201 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 098ad3c..401a240 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -23,6 +23,9 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.MethodDescriptor;
@@ -137,6 +140,16 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
private final ScheduledFuture<?> cleanupIdleConnectionTask;
+ private int maxConcurrentCallsPerServer;
+
+ private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
+ CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
+ build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
+ @Override public AtomicInteger load(InetSocketAddress key) throws Exception {
+ return new AtomicInteger(0);
+ }
+ });
+
/**
* Construct an IPC client for the cluster <code>clusterId</code>
* @param conf configuration
@@ -167,6 +180,9 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
this.metrics = metrics;
+ this.maxConcurrentCallsPerServer = conf.getInt(
+ HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
+ HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
@@ -382,16 +398,22 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
+ final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@Override
public void run(Call call) {
+ counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
}
}, cs);
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
+ int count = counter.incrementAndGet();
try {
+ if (count > maxConcurrentCallsPerServer) {
+ throw new ServerTooBusyException(addr, count);
+ }
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
new file mode 100644
index 0000000..c6ba030
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Throw this in rpc call if there are too many pending requests for one region server
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ServerTooBusyException extends DoNotRetryIOException {
+
+ public ServerTooBusyException(InetSocketAddress address, long count) {
+ super("There are " + count + " concurrent rpc requests for " + address);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 4c499a2..5c53030 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -738,6 +738,18 @@ public final class HConstants {
public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1;
/**
+ * The maximum number of concurrent pending RPC requests for one server in process level.
+ */
+ public static final String HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD =
+ "hbase.client.perserver.requests.threshold";
+
+ /**
+ * Default value of {@link #HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD}.
+ */
+ public static final int DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD = Integer.MAX_VALUE;
+
+
+ /**
* Parameter name for server pause value, used mostly as value to wait before
* running a retry of a failed operation.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index a791717..8315829 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -487,24 +487,34 @@ possible configurations would overwhelm and obscure the important.
<property>
<name>hbase.client.max.total.tasks</name>
<value>100</value>
- <description>The maximum number of concurrent tasks a single HTable instance will
+ <description>The maximum number of concurrent mutation tasks a single HTable instance will
send to the cluster.</description>
</property>
<property>
<name>hbase.client.max.perserver.tasks</name>
<value>5</value>
- <description>The maximum number of concurrent tasks a single HTable instance will
+ <description>The maximum number of concurrent mutation tasks a single HTable instance will
send to a single region server.</description>
</property>
<property>
<name>hbase.client.max.perregion.tasks</name>
<value>1</value>
- <description>The maximum number of concurrent connections the client will
+ <description>The maximum number of concurrent mutation tasks the client will
maintain to a single Region. That is, if there is already
hbase.client.max.perregion.tasks writes in progress for this region, new puts
won't be sent to this region until some writes finishes.</description>
</property>
<property>
+ <name>hbase.client.perserver.requests.threshold</name>
+ <value>2147483647</value>
+ <description>The max number of concurrent pending requests for one server in all client threads
+ (process level). Exceeding requests will be thrown ServerTooBusyException immediately to prevent
+ user's threads being occupied and blocked by only one slow region server. If you use a fix
+ number of threads to access HBase in a synchronous way, set this to a suitable value which is
+ related to the number of threads will help you. See
+ https://issues.apache.org/jira/browse/HBASE-16388 for details.</description>
+ </property>
+ <property>
<name>hbase.client.scanner.caching</name>
<value>2147483647</value>
<description>Number of rows that we try to fetch when calling next
http://git-wip-us.apache.org/repos/asf/hbase/blob/8ef6c763/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 4e61fd3..786f570 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.*;
import com.google.common.collect.Lists;
import java.io.IOException;
@@ -61,10 +60,12 @@ import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -85,8 +86,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import com.google.common.collect.Lists;
-import com.google.protobuf.RpcController;
+import static org.junit.Assert.fail;
/**
* This class is for testing HBaseConnectionManager features
@@ -150,6 +150,12 @@ public class TestHCM {
final Get get, final List<Cell> results) throws IOException {
Threads.sleep(SLEEP_TIME);
}
+
+ @Override
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Put put, final WALEdit edit, final Durability durability) throws IOException {
+ Threads.sleep(SLEEP_TIME);
+ }
}
public static class SleepWriteCoprocessor extends BaseRegionObserver {
@@ -187,6 +193,8 @@ public class TestHCM {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
// simulate queue blocking in testDropTimeoutRequest
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
+ // Used in testServerBusyException
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD, 3);
TEST_UTIL.startMiniCluster(2);
}
@@ -1338,4 +1346,109 @@ public class TestHCM {
table.close();
connection.close();
}
+
+ private class TestPutThread extends Thread {
+ Table table;
+ int getServerBusyException = 0;
+
+ TestPutThread(Table table){
+ this.table = table;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Put p = new Put(ROW);
+ p.addColumn(FAM_NAM, new byte[]{0}, new byte[]{0});
+ table.put(p);
+ } catch (RetriesExhaustedWithDetailsException e) {
+ if (e.exceptions.get(0).getCause() instanceof ServerTooBusyException) {
+ getServerBusyException = 1;
+ }
+ } catch (IOException ignore) {
+ }
+ }
+ }
+
+ private class TestGetThread extends Thread {
+ Table table;
+ int getServerBusyException = 0;
+
+ TestGetThread(Table table){
+ this.table = table;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Get g = new Get(ROW);
+ g.addColumn(FAM_NAM, new byte[]{0});
+ table.get(g);
+ } catch (RetriesExhaustedException e) {
+ if (e.getCause().getCause() instanceof ServerTooBusyException) {
+ getServerBusyException = 1;
+ }
+ } catch (IOException ignore) {
+ }
+ }
+ }
+
+ @Test()
+ public void testServerBusyException() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testServerBusy");
+ hdt.addCoprocessor(SleepCoprocessor.class.getName());
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
+
+ TestGetThread tg1 =
+ new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ TestGetThread tg2 =
+ new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ TestGetThread tg3 =
+ new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ TestGetThread tg4 =
+ new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ TestGetThread tg5 =
+ new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ tg1.start();
+ tg2.start();
+ tg3.start();
+ tg4.start();
+ tg5.start();
+ tg1.join();
+ tg2.join();
+ tg3.join();
+ tg4.join();
+ tg5.join();
+ assertEquals(2,
+ tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException
+ + tg4.getServerBusyException + tg5.getServerBusyException);
+
+ // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use multi at
+ // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
+
+ TestPutThread tp1 =
+ new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ TestPutThread tp2 =
+ new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ TestPutThread tp3 =
+ new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ TestPutThread tp4 =
+ new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ TestPutThread tp5 =
+ new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+ tp1.start();
+ tp2.start();
+ tp3.start();
+ tp4.start();
+ tp5.start();
+ tp1.join();
+ tp2.join();
+ tp3.join();
+ tp4.join();
+ tp5.join();
+ assertEquals(2,
+ tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException
+ + tp4.getServerBusyException + tp5.getServerBusyException);
+ }
}