You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/04/02 22:49:25 UTC

svn commit: r1584168 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/ipc/thrift/ main/java/org/apache/hadoop/hbase/thrift/ test/java/org/apache/hadoop/hbase/util/rpcbench/

Author: liyin
Date: Wed Apr  2 20:49:24 2014
New Revision: 1584168

URL: http://svn.apache.org/r1584168
Log:
[master] Let async API return client back to pool after the call finishes

Author: fan

Summary: Because nifty async client is not thread safe, we cannot multiplex connection :(

Test Plan:
TestSimpleOperations, TestHeaderSendReceive.
Benchmark completes without problem now.

Reviewers: adela, manukranthk, gauravm, daviddeng

Reviewed By: manukranthk

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D1228338

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Wed Apr  2 20:49:24 2014
@@ -135,9 +135,7 @@ public abstract class ServerCallable<T> 
     ((TableServers)connection).handleThrowable(t, this, couldNotCommunicateWithServer);
   }
 
-  public void readHeader() {
-    if (server instanceof HBaseToThriftAdapter) {
-      ((HBaseToThriftAdapter)server).readHeader();
-    }
+  public void postProcess() {
+    ((HBaseToThriftAdapter)server).postProcess();
   }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java Wed Apr  2 20:49:24 2014
@@ -203,10 +203,7 @@ public class HBaseToThriftAdapter implem
    * Read data that the server has sent to the client
    * TODO: test how it works with async calls
    */
-  public void readHeader() {
-    if (clientManager == null) {
-      return;
-    }
+  private void readHeader() {
     TTransport inputTransport = clientManager.getInputProtocol(connection)
         .getTransport();
     TTransport outputTransport = clientManager.getOutputProtocol(connection)
@@ -229,25 +226,19 @@ public class HBaseToThriftAdapter implem
     }
   }
 
-  private void postProcess() {
-    try {
-      if (this.useHeaderProtocol) {
-        readHeader();
+  public void postProcess() {
+    if (this.clientManager != null && this.connection != null) {
+      try {
+        if (this.useHeaderProtocol) {
+          readHeader();
+        }
+        HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
-      HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz);
-      this.connection = null;
-      this.clientManager = null;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void putBackClient() {
-    try {
-      HBaseThriftRPC.putBackClient(this.connection, this.addr, this.conf, this.clazz);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
     }
+    this.connection = null;
+    this.clientManager = null;
   }
 
   private void handleIOException(Exception e) throws IOException {
@@ -357,11 +348,7 @@ public class HBaseToThriftAdapter implem
 
   public ListenableFuture<Result> getClosestRowBeforeAsync(byte[] regionName, byte[] row, byte[] family) {
     preProcess();
-    try {
-      return connection.getClosestRowBeforeAsync(regionName, row, family);
-    } finally {
-      putBackClient();
-    }
+    return connection.getClosestRowBeforeAsync(regionName, row, family);
   }
 
   // TODO: we will decide whether to remove it from HRegionInterface in the future
@@ -582,11 +569,7 @@ public class HBaseToThriftAdapter implem
 
   public ListenableFuture<Result> getAsync(byte[] regionName, Get get) {
     preProcess();
-    try {
-      return connection.getAsync(regionName, get);
-    } finally {
-      putBackClient();
-    }
+    return connection.getAsync(regionName, get);
   }
 
   @Override
@@ -657,11 +640,7 @@ public class HBaseToThriftAdapter implem
 
   public ListenableFuture<Void> deleteAsync(final byte[] regionName, final Delete delete) {
     preProcess();
-    try {
-      return connection.deleteAsync(regionName, delete);
-    } finally {
-      putBackClient();
-    }
+    return connection.deleteAsync(regionName, delete);
   }
 
   @Override
@@ -782,8 +761,6 @@ public class HBaseToThriftAdapter implem
       return connection.mutateRowAsync(regionName, TRowMutations.Builder.createFromRowMutations(arm));
     } catch (IOException e) {
       return Futures.immediateFailedFuture(e);
-    } finally {
-      putBackClient();
     }
   }
 
@@ -903,11 +880,7 @@ public class HBaseToThriftAdapter implem
 
   public ListenableFuture<RowLock> lockRowAsync(byte[] regionName, byte[] row) {
     preProcess();
-    try {
-      return connection.lockRowAsync(regionName, row);
-    } finally {
-      putBackClient();
-    }
+    return connection.lockRowAsync(regionName, row);
   }
 
   @Override
@@ -929,11 +902,7 @@ public class HBaseToThriftAdapter implem
 
   public ListenableFuture<Void> unlockRowAsync(byte[] regionName, long lockId) {
     preProcess();
-    try {
-      return connection.unlockRowAsync(regionName, lockId);
-    } finally {
-      putBackClient();
-    }
+    return connection.unlockRowAsync(regionName, lockId);
   }
 
   @Override

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/ThriftClientCacheWithConnectionPooling.java Wed Apr  2 20:49:24 2014
@@ -55,7 +55,7 @@ public class ThriftClientCacheWithConnec
   public static final String MIN_IDLE = "hbase.client.cachepool.minIdle";
 
   private final ThriftClientManager clientManager;
-  private final Map<Pair<InetSocketAddress,
+  private final ConcurrentHashMap<Pair<InetSocketAddress,
     Class<? extends ThriftClientInterface>>,
       GenericObjectPool<ThriftClientInterface>>
     clientPools;
@@ -75,12 +75,10 @@ public class ThriftClientCacheWithConnec
         address, clazz);
     GenericObjectPool<ThriftClientInterface> clientPool = clientPools.get(key);
     if (clientPool == null) {
-      synchronized (clientPools) {
-        clientPool = clientPools.get(key);
-        if (clientPool == null) {
-          clientPool = createGenericObjectPool(address, clazz);
-          clientPools.put(key, clientPool);
-        }
+      clientPool = createGenericObjectPool(address, clazz);
+      GenericObjectPool<ThriftClientInterface> existing = clientPools.putIfAbsent(key, clientPool);
+      if (existing != null) {
+        clientPool = existing;
       }
     }
     return clientPool.borrowObject();
@@ -100,10 +98,10 @@ public class ThriftClientCacheWithConnec
     ThriftClientObjectFactory factory = new ThriftClientObjectFactory(address,
         clazz, this.clientManager, this.conf);
     GenericObjectPool.Config config = new GenericObjectPool.Config();
-    long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 1000;
+    long DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS = 5000;
     long DEFAULT_MIN_EVICTABLE_IDLE_TIME_MILLIS = 30000;
     long DEFAULT_WHEN_EXHAUSTED_MAX_WAITTIME = 1000;
-    int DEFAULT_MAX_ACTIVE = 2000;
+    int DEFAULT_MAX_ACTIVE = 1000;
 
     // Keep some idle connections to prevent asynchronous client from contention on
     //   a single connection to each region server.

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/thrift/SelfRetryingListenableFuture.java Wed Apr  2 20:49:24 2014
@@ -161,7 +161,7 @@ public class SelfRetryingListenableFutur
    * @param v Result from server
    */
   private void setSuccess(V v) {
-    callable.readHeader();
+    postProcess();
     downstream.set(v);
   }
 
@@ -171,9 +171,14 @@ public class SelfRetryingListenableFutur
    * @param t The exception for client
    */
   private void setFailure(Throwable t) {
+    postProcess();
     downstream.setException(t);
   }
 
+  private void postProcess() {
+    callable.postProcess();
+  }
+
   /**
    * Unwrap exception if it's from server side and handle all scenarios.
    *

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java?rev=1584168&r1=1584167&r2=1584168&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/rpcbench/HBaseRPCBenchmarkTool.java Wed Apr  2 20:49:24 2014
@@ -492,8 +492,7 @@ public class HBaseRPCBenchmarkTool exten
             setProfilingData(false);
           }
         } catch (Exception e) {
-          LOG.debug("Encountered exception while performing get");
-          e.printStackTrace();
+          LOG.debug("Encountered exception while performing get", e);
           break;
         }
         long delta = System.nanoTime() - opStartNs;
@@ -546,7 +545,7 @@ public class HBaseRPCBenchmarkTool exten
   }
 
   public double getAverageLatency() {
-    return this.sumLatency.get() / (double)this.totalOps.get();
+    return this.sumLatency.get() * (double)this.multigetBatch / (double)this.totalOps.get();
   }
 
   public double getP95Latency() {
@@ -561,9 +560,10 @@ public class HBaseRPCBenchmarkTool exten
     HBaseRPCBenchmarkTool tool = new HBaseRPCBenchmarkTool();
     int ret = tool.doStaticMain(args);
     System.out.println("Total throughput : " + tool.getThroughput());
-    System.out.println("Avg Latency : " + tool.getAverageLatency());
-    System.out.println("P99 latency : " + tool.getP99Latency());
-    System.out.println("P95 latency : " + tool.getP95Latency());
+    System.out.println("Latencies in ms --");
+    System.out.println("  Avg Latency : " + tool.getAverageLatency() / 1000000.0);
+    System.out.println("  P99 latency : " + tool.getP99Latency() / 1000000.0);
+    System.out.println("  P95 latency : " + tool.getP95Latency() / 1000000.0);
     System.exit(ret);
   }
 }