You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/11/07 21:14:46 UTC

svn commit: r1406785 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java

Author: liyin
Date: Wed Nov  7 20:14:46 2012
New Revision: 1406785

URL: http://svn.apache.org/viewvc?rev=1406785&view=rev
Log:
[HBASE-7100] Allow multiple connections from HBaseClient to each remote endpoint

Author: kranganathan

Summary:
Allows a conf-controlled set of connections per server. With this, the get benchmark from one client to one server is now able to do 145K to 152K gets/sec.

Note that the first param that the HBaseClient is created with is respected, all future confs are ignored.

Test Plan: Tested by running the get benchmark.

Reviewers: kannan, liyintang

Reviewed By: kannan

CC: hbase-eng@

Differential Revision: https://phabricator.fb.com/D621225

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1406785&r1=1406784&r2=1406785&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Nov  7 20:14:46 2012
@@ -43,6 +43,7 @@ import java.net.UnknownHostException;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -83,9 +84,9 @@ public class HBaseClient {
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
   // Active connections are stored in connections.
-  protected final ConcurrentMap<ConnectionId, Connection> connections =
+  protected final ConcurrentMap<ConnectionId, Connection> connections = 
     new ConcurrentHashMap<ConnectionId, Connection>();
-
+  
   protected int counter;                            // counter for call ids
   protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
   final protected Configuration conf;
@@ -98,6 +99,12 @@ public class HBaseClient {
   protected int pingInterval; // how often sends ping to the server in msecs
   private final int connectionTimeOutMillSec; // the connection time out
 
+  private final int numConnectionsPerServer;
+  public static final String NUM_CONNECTIONS_PER_SERVER = 
+    "hbase.client.max.connections.per.server";
+  public static final int DEFAULT_NUM_CONNECTIONS_PER_SERVER = 1;
+  private Random random = new Random();
+
   protected final SocketFactory socketFactory;           // how to create sockets
 
   final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -186,7 +193,7 @@ public class HBaseClient {
       return version;
     }
   }
-
+  
   /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
@@ -785,6 +792,10 @@ public class HBaseClient {
     this.socketFactory = factory;
     this.connectionTimeOutMillSec =
       conf.getInt("hbase.client.connection.timeout.millsec", 5000);
+    this.numConnectionsPerServer = conf.getInt(NUM_CONNECTIONS_PER_SERVER, 
+        DEFAULT_NUM_CONNECTIONS_PER_SERVER);
+    LOG.debug("Created a new HBaseClient with " + numConnectionsPerServer + 
+        " connections per remote server.");
   }
 
   /**
@@ -974,12 +985,15 @@ public class HBaseClient {
     }
     call.setVersion(version);
     Connection connection;
+    // if multiple connections are enabled per remote, get a random one
+    int connectionNum = (numConnectionsPerServer > 1)?
+        random.nextInt(numConnectionsPerServer):0;
     /* we could avoid this allocation for each RPC by having a
      * connectionsId object and with set() method. We need to manage the
      * refs for keys in HashMap properly. For now its ok.
      */
     ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout,
-        call.getVersion());
+        call.getVersion(), connectionNum);
     do {
       connection = connections.get(remoteId);
       if (connection == null) {
@@ -1056,13 +1070,15 @@ public class HBaseClient {
     final UserGroupInformation ticket;
     final private int rpcTimeout;
     final private int version;
+    final private int connectionNum;
 
     ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
-        int rpcTimeout, int version) {
+        int rpcTimeout, int version, int connectionNum) {
       this.address = address;
       this.ticket = ticket;
       this.rpcTimeout = rpcTimeout;
       this.version = version;
+      this.connectionNum = connectionNum;
     }
 
     InetSocketAddress getAddress() {
@@ -1077,7 +1093,8 @@ public class HBaseClient {
      if (obj instanceof ConnectionId) {
        ConnectionId id = (ConnectionId) obj;
        return address.equals(id.address) && ticket == id.ticket &&
-       rpcTimeout == id.rpcTimeout && version == id.version;
+       rpcTimeout == id.rpcTimeout && version == id.version && 
+       connectionNum == id.connectionNum;
        //Note : ticket is a ref comparision.
      }
      return false;
@@ -1086,7 +1103,7 @@ public class HBaseClient {
     @Override
     public int hashCode() {
       return address.hashCode() ^ System.identityHashCode(ticket) ^
-          rpcTimeout ^ version;
+          rpcTimeout ^ version ^ connectionNum;
     }
   }
 }