You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/02 22:49:25 UTC
svn commit: r1584168 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/ipc/thrift/
main/java/org/apache/hadoop/hbase/thrift/
test/java/org/apache/hadoop/hbase/util/rpcbench/
Author: liyin
Date: Wed Apr 2 20:49:24 2014
New Revision: 1584168
URL: http://svn.apache.org/r1584168
Log:
[master] Let async API return client back to pool after the call finishes
Author: fan
Summary: Because nifty async client is not thread safe, we cannot multiplex connection :(
Test Plan:
TestSimpleOperations, TestHeaderSendReceive.
Benchmark completes without problem now.
Reviewers: adela, manukranthk, gauravm, daviddeng
Reviewed By: manukranthk
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D1228338
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Wed Apr 2 20:49:24 2014
@@ -135,9 +135,7 @@ public abstract class ServerCallable<T>
((TableServers)connection).handleThrowable(t, this, couldNotCommunicateWithServer);
}
- public void readHeader() {
- if (server instanceof HBaseToThriftAdapter) {
- ((HBaseToThriftAdapter)server).readHeader();
- }
+ public void postProcess() {
+ ((HBaseToThriftAdapter)server).postProcess();
}
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java Wed Apr 2 20:49:24 2014
@@ -203,10 +203,7 @@ public class HBaseToThriftAdapter implem
* Read data that the server has sent to the client
* TODO: test how it works with async calls
*/
- public void readHeader() {
- if (clientManager == null) {
- return;
- }
+ private void readHeader() {
TTransport inputTransport = clientManager.getInputProtocol(connection)
.getTransport();
TTransport outputTransport = clientManager.getOutputProtocol(connection)
@@ -229,25 +226,19 @@ public class HBaseToThriftAdapter implem
}
}
- private void postProcess() {
- try {
- if (this.useHeaderProtocol) {
- readHeader();
+ public void postProcess() {
+ if (this.clientManager != null && this.connection != null) {
+ try {
+ if (this.useHeaderProtocol) {
+ readHeader();
+ }
+ HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz);
- this.connection = null;
- this.clientManager = null;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void putBackClient() {
- try {
- HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz);
- } catch (Exception e) {
- throw new RuntimeException(e);
}
+ this.connection = null;
+ this.clientManager = null;
}
private void handleIOException(Exception e) throws IOException {
@@ -357,11 +348,7 @@ public class HBaseToThriftAdapter implem
public ListenableFuture<Result> getClosestRowBeforeAsync(byte[] regionName, byte[] row, byte[] family) {
preProcess();
- try {
- return connection.getClosestRowBeforeAsync(regionName, row, family);
- } finally {
- putBackClient();
- }
+ return connection.getClosestRowBeforeAsync(regionName, row, family);
}
// TODO: we will decide whether to remove it from HRegionInterface in the future
@@ -582,11 +569,7 @@ public class HBaseToThriftAdapter implem
public ListenableFuture<Result> getAsync(byte[] regionName, Get get) {
preProcess();
- try {
- return connection.getAsync(regionName, get);
- } finally {
- putBackClient();
- }
+ return connection.getAsync(regionName, get);
}
@Override
@@ -657,11 +640,7 @@ public class HBaseToThriftAdapter implem
public ListenableFuture<Void> deleteAsync(final byte[] regionName, final Delete delete) {
preProcess();
- try {
- return connection.deleteAsync(regionName, delete);
- } finally {
- putBackClient();
- }
+ return connection.deleteAsync(regionName, delete);
}
@Override
@@ -782,8 +761,6 @@ public class HBaseToThriftAdapter implem
return connection.mutateRowAsync(regionName, TRowMutations.Builder.createFromRowMutations(arm));
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
- } finally {
- putBackClient();
}
}
@@ -903,11 +880,7 @@ public class HBaseToThriftAdapter implem
public ListenableFuture<RowLock> lockRowAsync(byte[] regionName, byte[] row) {
preProcess();
- try {
- return connection.lockRowAsync(regionName, row);
- } finally {
- putBackClient();
- }
+ return connection.lockRowAsync(regionName, row);
}
@Override
@@ -929,11 +902,7 @@ public class HBaseToThriftAdapter implem
public ListenableFuture<Void> unlockRowAsync(byte[] regionName, long lockId) {
preProcess();
- try {
- return connection.unlockRowAsync(regionName, lockId);
- } finally {
- putBackClient();
- }
+ return connection.unlockRowAsync(regionName, lockId);
}
@Override
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java Wed Apr 2 20:49:24 2014
@@ -55,7 +55,7 @@ public class ThriftClientCacheWithConnec
public static final String MIN_IDLE = "hbase.client.cachepool.minIdle";
private final ThriftClientManager clientManager;
- private final Map<Pair<InetSocketAddress,
+ private final ConcurrentHashMap<Pair<InetSocketAddress,
Class<? extends ThriftClientInterface>>,
GenericObjectPool<ThriftClientInterface>>
clientPools;
@@ -75,12 +75,10 @@ public class ThriftClientCacheWithConnec
address, clazz);
GenericObjectPool<ThriftClientInterface> clientPool = clientPools.get(key);
if (clientPool == null) {
- synchronized (clientPools) {
- clientPool = clientPools.get(key);
- if (clientPool == null) {
- clientPool = createGenericObjectPool(address, clazz);
- clientPools.put(key, clientPool);
- }
+ clientPool = createGenericObjectPool(address, clazz);
+ GenericObjectPool<ThriftClientInterface> existing = clientPools.putIfAbsent(key, clientPool);
+ if (existing != null) {
+ clientPool = existing;
}
}
return clientPool.borrowObject();
@@ -100,10 +98,10 @@ public class ThriftClientCacheWithConnec
ThriftClientObjectFactory factory = new ThriftClientObjectFactory(address,
clazz, this.clientManager, this.conf);
GenericObjectPool.Config config = new GenericObjectPool.Config();
- long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 1000;
+ long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 5000;
long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 30000;
long DEFAULT_WHEN_EXHAUSTED_MAX_WAITTIME = 1000;
- int DEFAULT_MAX_ACTIVE = 2000;
+ int DEFAULT_MAX_ACTIVE = 1000;
// Keep some idle connections to prevent asynchronous client from contention on
// a single connection to each region server.
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java Wed Apr 2 20:49:24 2014
@@ -161,7 +161,7 @@ public class SelfRetryingListenableFutur
* @param v Result from server
*/
private void setSuccess(V v) {
- callable.readHeader();
+ postProcess();
downstream.set(v);
}
@@ -171,9 +171,14 @@ public class SelfRetryingListenableFutur
* @param t The exception for client
*/
private void setFailure(Throwable t) {
+ postProcess();
downstream.setException(t);
}
+ private void postProcess() {
+ callable.postProcess();
+ }
+
/**
* Unwrap exception if it's from server side and handle all scenarios.
*
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java Wed Apr 2 20:49:24 2014
@@ -492,8 +492,7 @@ public class HBaseRPCBenchmarkTool exten
setProfilingData(false);
}
} catch (Exception e) {
- LOG.debug("Encountered exception while performing get");
- e.printStackTrace();
+ LOG.debug("Encountered exception while performing get", e);
break;
}
long delta = System.nanoTime() - opStartNs;
@@ -546,7 +545,7 @@ public class HBaseRPCBenchmarkTool exten
}
public double getAverageLatency() {
- return this.sumLatency.get() / (double)this.totalOps.get();
+ return this.sumLatency.get() * (double)this.multigetBatch / (double)this.totalOps.get();
}
public double getP95Latency() {
@@ -561,9 +560,10 @@ public class HBaseRPCBenchmarkTool exten
HBaseRPCBenchmarkTool tool = new HBaseRPCBenchmarkTool();
int ret = tool.doStaticMain(args);
System.out.println("Total throughput : " + tool.getThroughput());
- System.out.println("Avg Latency : " + tool.getAverageLatency());
- System.out.println("P99 latency : " + tool.getP99Latency());
- System.out.println("P95 latency : " + tool.getP95Latency());
+ System.out.println("Latencies in ms --");
+ System.out.println(" Avg Latency : " + tool.getAverageLatency() / 1000000.0);
+ System.out.println(" P99 latency : " + tool.getP99Latency() / 1000000.0);
+ System.out.println(" P95 latency : " + tool.getP95Latency() / 1000000.0);
System.exit(ret);
}
}