You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/11/07 21:14:46 UTC
svn commit: r1406785 -
/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
Author: liyin
Date: Wed Nov 7 20:14:46 2012
New Revision: 1406785
URL: http://svn.apache.org/viewvc?rev=1406785&view=rev
Log:
[HBASE-7100] Allow multiple connections from HBaseClient to each remote endpoint
Author: kranganathan
Summary:
Allows a conf-controlled set of connections per server. With this, the get benchmark from one client to one server is now able to do 145K to 152K gets/sec.
Note that the first param that the HBaseClient is created with is respected, all future confs are ignored.
Test Plan: Tested by running the get benchmark.
Reviewers: kannan, liyintang
Reviewed By: kannan
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D621225
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1406785&r1=1406784&r2=1406785&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Wed Nov 7 20:14:46 2012
@@ -43,6 +43,7 @@ import java.net.UnknownHostException;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -83,9 +84,9 @@ public class HBaseClient {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
// Active connections are stored in connections.
- protected final ConcurrentMap<ConnectionId, Connection> connections =
+ protected final ConcurrentMap<ConnectionId, Connection> connections =
new ConcurrentHashMap<ConnectionId, Connection>();
-
+
protected int counter; // counter for call ids
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
final protected Configuration conf;
@@ -98,6 +99,12 @@ public class HBaseClient {
protected int pingInterval; // how often sends ping to the server in msecs
private final int connectionTimeOutMillSec; // the connection time out
+ private final int numConnectionsPerServer;
+ public static final String NUM_CONNECTIONS_PER_SERVER =
+ "hbase.client.max.connections.per.server";
+ public static final int DEFAULT_NUM_CONNECTIONS_PER_SERVER = 1;
+ private Random random = new Random();
+
protected final SocketFactory socketFactory; // how to create sockets
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
@@ -186,7 +193,7 @@ public class HBaseClient {
return version;
}
}
-
+
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
@@ -785,6 +792,10 @@ public class HBaseClient {
this.socketFactory = factory;
this.connectionTimeOutMillSec =
conf.getInt("hbase.client.connection.timeout.millsec", 5000);
+ this.numConnectionsPerServer = conf.getInt(NUM_CONNECTIONS_PER_SERVER,
+ DEFAULT_NUM_CONNECTIONS_PER_SERVER);
+ LOG.debug("Created a new HBaseClient with " + numConnectionsPerServer +
+ " connections per remote server.");
}
/**
@@ -974,12 +985,15 @@ public class HBaseClient {
}
call.setVersion(version);
Connection connection;
+ // if multiple connections are enabled per remote, get a random one
+ int connectionNum = (numConnectionsPerServer > 1)?
+ random.nextInt(numConnectionsPerServer):0;
/* we could avoid this allocation for each RPC by having a
* connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok.
*/
ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout,
- call.getVersion());
+ call.getVersion(), connectionNum);
do {
connection = connections.get(remoteId);
if (connection == null) {
@@ -1056,13 +1070,15 @@ public class HBaseClient {
final UserGroupInformation ticket;
final private int rpcTimeout;
final private int version;
+ final private int connectionNum;
ConnectionId(InetSocketAddress address, UserGroupInformation ticket,
- int rpcTimeout, int version) {
+ int rpcTimeout, int version, int connectionNum) {
this.address = address;
this.ticket = ticket;
this.rpcTimeout = rpcTimeout;
this.version = version;
+ this.connectionNum = connectionNum;
}
InetSocketAddress getAddress() {
@@ -1077,7 +1093,8 @@ public class HBaseClient {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) && ticket == id.ticket &&
- rpcTimeout == id.rpcTimeout && version == id.version;
+ rpcTimeout == id.rpcTimeout && version == id.version &&
+ connectionNum == id.connectionNum;
//Note : ticket is a ref comparision.
}
return false;
@@ -1086,7 +1103,7 @@ public class HBaseClient {
@Override
public int hashCode() {
return address.hashCode() ^ System.identityHashCode(ticket) ^
- rpcTimeout ^ version;
+ rpcTimeout ^ version ^ connectionNum;
}
}
}