You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/06/11 04:56:17 UTC
[31/50] hbase git commit: HBASE-15948 Port "HADOOP-9956 RPC listener
inefficiently assigns connections to readers" Adds HADOOP-9955 RPC idle
connection closing is extremely inefficient Then removes queue added by
HADOOP-9956 at Enis suggestion
HBASE-15948 Port "HADOOP-9956 RPC listener inefficiently assigns connections to readers"
Adds HADOOP-9955 RPC idle connection closing is extremely inefficient
Then removes queue added by HADOOP-9956 at Enis suggestion
Changes how we do accounting of Connections to match how it is done in Hadoop.
Adds a ConnectionManager class. Adds new configurations for this new class.
"hbase.ipc.client.idlethreshold" 4000
"hbase.ipc.client.connection.idle-scan-interval.ms" 10000
"hbase.ipc.client.connection.maxidletime" 10000
"hbase.ipc.client.kill.max", 10
"hbase.ipc.server.handler.queue.size", 100
The new scheme does away with synchronization that purportedly would freeze out
reads while we were cleaning up stale connections (according to HADOOP-9955)
Also adds in new mechanism for accepting Connections by pulling in as many
as we can at a time adding them to a Queue instead of doing one at a time.
Can help when bursty traffic according to HADOOP-9956. Removes a blocking
while Reader is busy parsing a request. Adds configuration
"hbase.ipc.server.read.connection-queue.size" with default of 100 for
queue size.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e0b70c00
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e0b70c00
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e0b70c00
Branch: refs/heads/hbase-12439
Commit: e0b70c00e74aeaac33570508e3732a53daea839e
Parents: da88b48
Author: stack <st...@apache.org>
Authored: Fri Jun 3 15:38:07 2016 -0700
Committer: stack <st...@apache.org>
Committed: Tue Jun 7 13:10:14 2016 -0700
----------------------------------------------------------------------
.../hbase/ipc/MetricsHBaseServerSource.java | 10 +-
.../ipc/MetricsHBaseServerWrapperImpl.java | 6 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 408 +++++++++++--------
.../regionserver/SimpleRpcSchedulerFactory.java | 2 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 2 +-
5 files changed, 241 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index bb89789..ce57e0f 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -52,14 +52,16 @@ public interface MetricsHBaseServerSource extends BaseSource {
String TOTAL_CALL_TIME_NAME = "totalCallTime";
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
String QUEUE_SIZE_NAME = "queueSize";
- String QUEUE_SIZE_DESC = "Number of bytes in the call queues.";
+ String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and " +
+ "parsed and is waiting to run or is currently being executed.";
String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue";
- String GENERAL_QUEUE_DESC = "Number of calls in the general call queue.";
+ String GENERAL_QUEUE_DESC = "Number of calls in the general call queue; " +
+ "parsed requests waiting in scheduler to be executed";
String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
String REPLICATION_QUEUE_DESC =
- "Number of calls in the replication call queue.";
- String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue.";
+ "Number of calls in the replication call queue waiting to be run";
+ String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 9979c75..4f53709 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -36,7 +36,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
if (!isServerStarted()) {
return 0;
}
- return server.callQueueSize.get();
+ return server.callQueueSizeInBytes.get();
}
@Override
@@ -65,10 +65,10 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
@Override
public int getNumOpenConnections() {
- if (!isServerStarted() || this.server.connectionList == null) {
+ if (!isServerStarted()) {
return 0;
}
- return server.connectionList.size();
+ return server.getNumOpenConnections();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 483ce86..aca3fdd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -48,15 +48,16 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -113,6 +114,7 @@ import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -183,11 +185,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
- /**
- * The maximum size that we can hold in the RPC queue
- */
- private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
-
private final IPCUtil ipcUtil;
private static final String AUTH_FAILED_FOR = "Auth failed for ";
@@ -210,22 +207,30 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected int port; // port we listen on
protected InetSocketAddress address; // inet address we listen on
private int readThreads; // number of read threads
- protected int maxIdleTime; // the maximum idle time after
- // which a client may be
- // disconnected
- protected int thresholdIdleConnections; // the number of idle
- // connections after which we
- // will start cleaning up idle
- // connections
- int maxConnectionsToNuke; // the max number of
- // connections to nuke
- // during a cleanup
-
protected MetricsHBaseServer metrics;
protected final Configuration conf;
- private int maxQueueSize;
+ /**
+ * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
+ * this size, then we will reject the call (after parsing it though). It will go back to the
+ * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
+ * call queue size gets incremented after we parse a call and before we add it to the queue of
+ * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
+ * size is kept in {@link #callQueueSizeInBytes}.
+ * @see {@link #callQueueSizeInBytes}
+ * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
+ * @see {@link #callQueueSizeInBytes}
+ */
+ private final long maxQueueSizeInBytes;
+ private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
+
+ /**
+ * This is a running count of the size in bytes of all outstanding calls whether currently
+ * executing or queued waiting to be run.
+ */
+ protected final Counter callQueueSizeInBytes = new Counter();
+
protected int socketSendBufferSize;
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -244,19 +249,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
volatile boolean started = false;
- /**
- * This is a running count of the size of all outstanding calls by size.
- */
- protected final Counter callQueueSize = new Counter();
-
- protected final List<Connection> connectionList =
- Collections.synchronizedList(new LinkedList<Connection>());
- //maintain a list
- //of client connections
+ // maintains the set of client connections and handles idle timeouts
+ private ConnectionManager connectionManager;
private Listener listener = null;
protected Responder responder = null;
protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
- protected int numConnections = 0;
protected HBaseRPCErrorHandler errorHandler = null;
@@ -623,18 +620,13 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null;
private int currentReader = 0;
- private Random rand = new Random();
- private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
- //-tion (for idle connections) ran
- private long cleanupInterval = 10000; //the minimum interval between
- //two cleanup runs
- private int backlogLength;
private ExecutorService readPool;
public Listener(final String name) throws IOException {
super(name);
- backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
+ // The backlog of requests that we will have the serversocket carry.
+ int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
@@ -644,9 +636,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
// create a selector;
- selector= Selector.open();
+ selector = Selector.open();
readers = new Reader[readThreads];
+ // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
+ // has an advantage in that it is easy to shutdown the pool.
readPool = Executors.newFixedThreadPool(readThreads,
new ThreadFactoryBuilder().setNameFormat(
"RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
@@ -667,12 +661,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private class Reader implements Runnable {
- private volatile boolean adding = false;
private final Selector readSelector;
Reader() throws IOException {
this.readSelector = Selector.open();
}
+
@Override
public void run() {
try {
@@ -686,14 +680,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
- private synchronized void doRunLoop() {
+ private void doRunLoop() {
while (running) {
try {
readSelector.select();
- while (adding) {
- this.wait(1000);
- }
-
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
@@ -703,9 +693,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
doRead(key);
}
}
+ key = null;
}
} catch (InterruptedException e) {
- LOG.debug("Interrupted while sleeping");
+ if (running) { // unexpected -- log it
+ LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
+ }
return;
} catch (IOException ex) {
LOG.info(getName() + ": IOException in Reader", ex);
@@ -714,76 +707,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
/**
- * This gets reader into the state that waits for the new channel
- * to be registered with readSelector. If it was waiting in select()
- * the thread will be woken up, otherwise whenever select() is called
- * it will return even if there is nothing to read and wait
- * in while(adding) for finishAdd call
+ * Updating the readSelector while it's being used is not thread-safe,
+ * so the connection must be queued. The reader will drain the queue
+ * and update its readSelector before performing the next select
*/
- public void startAdd() {
- adding = true;
+ public void addConnection(Connection conn) throws IOException {
+ conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
readSelector.wakeup();
}
-
- public synchronized SelectionKey registerChannel(SocketChannel channel)
- throws IOException {
- return channel.register(readSelector, SelectionKey.OP_READ);
- }
-
- public synchronized void finishAdd() {
- adding = false;
- this.notify();
- }
- }
-
- /** cleanup connections from connectionList. Choose a random range
- * to scan and also have a limit on the number of the connections
- * that will be cleanedup per run. The criteria for cleanup is the time
- * for which the connection was idle. If 'force' is true then all
- * connections will be looked at for the cleanup.
- * @param force all connections will be looked at for cleanup
- */
- private void cleanupConnections(boolean force) {
- if (force || numConnections > thresholdIdleConnections) {
- long currentTime = System.currentTimeMillis();
- if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
- return;
- }
- int start = 0;
- int end = numConnections - 1;
- if (!force) {
- start = rand.nextInt() % numConnections;
- end = rand.nextInt() % numConnections;
- int temp;
- if (end < start) {
- temp = start;
- start = end;
- end = temp;
- }
- }
- int i = start;
- int numNuked = 0;
- while (i <= end) {
- Connection c;
- synchronized (connectionList) {
- try {
- c = connectionList.get(i);
- } catch (Exception e) {return;}
- }
- if (c.timedOut(currentTime)) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
- closeConnection(c);
- numNuked++;
- end--;
- //noinspection UnusedAssignment
- c = null;
- if (!force && numNuked == maxConnectionsToNuke) break;
- }
- else i++;
- }
- lastCleanupRunTime = System.currentTimeMillis();
- }
}
@Override
@@ -792,6 +723,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
"it will have per impact")
public void run() {
LOG.info(getName() + ": starting");
+ connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
@@ -815,7 +747,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (errorHandler.checkOOME(e)) {
LOG.info(getName() + ": exiting on OutOfMemoryError");
closeCurrentConnection(key, e);
- cleanupConnections(true);
+ connectionManager.closeIdle(true);
return;
}
} else {
@@ -824,22 +756,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// some thread(s) a chance to finish
LOG.warn(getName() + ": OutOfMemoryError in server select", e);
closeCurrentConnection(key, e);
- cleanupConnections(true);
+ connectionManager.closeIdle(true);
try {
Thread.sleep(60000);
} catch (InterruptedException ex) {
LOG.debug("Interrupted while sleeping");
- return;
}
}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
- cleanupConnections(false);
}
-
LOG.info(getName() + ": stopping");
-
synchronized (this) {
try {
acceptChannel.close();
@@ -851,10 +779,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
selector= null;
acceptChannel= null;
- // clean up all connections
- while (!connectionList.isEmpty()) {
- closeConnection(connectionList.remove(0));
- }
+ // close all connections
+ connectionManager.stopIdleScan();
+ connectionManager.closeAll();
}
}
@@ -862,10 +789,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (key != null) {
Connection c = (Connection)key.attachment();
if (c != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
- (e != null ? " on error " + e.getMessage() : ""));
- }
closeConnection(c);
key.attach(null);
}
@@ -876,37 +799,24 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return address;
}
- void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
- Connection c;
+ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
-
SocketChannel channel;
while ((channel = server.accept()) != null) {
- try {
- channel.configureBlocking(false);
- channel.socket().setTcpNoDelay(tcpNoDelay);
- channel.socket().setKeepAlive(tcpKeepAlive);
- } catch (IOException ioe) {
- channel.close();
- throw ioe;
- }
-
+ channel.configureBlocking(false);
+ channel.socket().setTcpNoDelay(tcpNoDelay);
+ channel.socket().setKeepAlive(tcpKeepAlive);
Reader reader = getReader();
- try {
- reader.startAdd();
- SelectionKey readKey = reader.registerChannel(channel);
- c = getConnection(channel, System.currentTimeMillis());
- readKey.attach(c);
- synchronized (connectionList) {
- connectionList.add(numConnections, c);
- numConnections++;
+ Connection c = connectionManager.register(channel);
+ // If the connectionManager can't take it, close the connection.
+ if (c == null) {
+ if (channel.isOpen()) {
+ IOUtils.cleanup(null, channel);
}
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": connection from " + c.toString() +
- "; # active connections: " + numConnections);
- } finally {
- reader.finishAdd();
+ continue;
}
+ key.attach(c); // so closeCurrentConnection can get the object
+ reader.addConnection(c);
}
}
@@ -919,12 +829,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess();
-
- if (count > 0) {
- c.setLastContact(System.currentTimeMillis());
- }
-
} catch (InterruptedException ieo) {
+ LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
@@ -933,12 +839,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
- " because read count=" + count +
- ". Number of active connections: " + numConnections);
- }
closeConnection(c);
+ c = null;
+ } else {
+ c.setLastContact(System.currentTimeMillis());
}
}
@@ -1355,6 +1259,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return null;
}
+ public long getLastContact() {
+ return lastContact;
+ }
+
/* Return true if the connection has no outstanding rpc */
private boolean isIdle() {
return rpcCount.get() == 0;
@@ -1370,10 +1278,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
rpcCount.increment();
}
- protected boolean timedOut(long currentTime) {
- return isIdle() && currentTime - lastContact > maxIdleTime;
- }
-
private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException {
UserGroupInformation authorizedUgi;
@@ -1883,7 +1787,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
// Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the total request.
- if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
+ if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null, 0);
@@ -1954,7 +1858,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
totalRequestSize, traceInfo, this.addr, timeout);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
- callQueueSize.add(-1 * call.getSize());
+ callQueueSizeInBytes.add(-1 * call.getSize());
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
@@ -2093,12 +1997,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.bindAddress = bindAddress;
this.conf = conf;
this.socketSendBufferSize = 0;
- this.maxQueueSize =
- this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
+ // See declaration above for documentation on what this size is.
+ this.maxQueueSizeInBytes =
+ this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
- this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
- this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
- this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
@@ -2120,6 +2022,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Create the responder here
responder = new Responder();
+ connectionManager = new ConnectionManager();
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
this.userProvider = UserProvider.instantiate(conf);
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
@@ -2177,12 +2080,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
protected void closeConnection(Connection connection) {
- synchronized (connectionList) {
- if (connectionList.remove(connection)) {
- numConnections--;
- }
- }
- connection.close();
+ connectionManager.close(connection);
}
Configuration getConf() {
@@ -2440,7 +2338,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override
public void addCallSize(final long diff) {
- this.callQueueSize.add(diff);
+ this.callQueueSizeInBytes.add(diff);
}
/**
@@ -2578,6 +2476,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
/**
+ * The number of open RPC conections
+ * @return the number of open rpc connections
+ */
+ public int getNumOpenConnections() {
+ return connectionManager.size();
+ }
+
+ /**
* Returns the username for any user associated with the current RPC
* request or <code>null</code> if no user is set.
*/
@@ -2695,4 +2601,150 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public RpcScheduler getScheduler() {
return scheduler;
}
+
+ private class ConnectionManager {
+ final private AtomicInteger count = new AtomicInteger();
+ final private Set<Connection> connections;
+
+ final private Timer idleScanTimer;
+ final private int idleScanThreshold;
+ final private int idleScanInterval;
+ final private int maxIdleTime;
+ final private int maxIdleToClose;
+
+ ConnectionManager() {
+ this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
+ this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
+ this.idleScanInterval =
+ conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
+ this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
+ this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
+ int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ int maxConnectionQueueSize =
+ handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
+ // create a set with concurrency -and- a thread-safe iterator, add 2
+ // for listener and idle closer threads
+ this.connections = Collections.newSetFromMap(
+ new ConcurrentHashMap<Connection,Boolean>(
+ maxConnectionQueueSize, 0.75f, readThreads+2));
+ }
+
+ private boolean add(Connection connection) {
+ boolean added = connections.add(connection);
+ if (added) {
+ count.getAndIncrement();
+ }
+ return added;
+ }
+
+ private boolean remove(Connection connection) {
+ boolean removed = connections.remove(connection);
+ if (removed) {
+ count.getAndDecrement();
+ }
+ return removed;
+ }
+
+ int size() {
+ return count.get();
+ }
+
+ Connection[] toArray() {
+ return connections.toArray(new Connection[0]);
+ }
+
+ Connection register(SocketChannel channel) {
+ Connection connection = new Connection(channel, System.currentTimeMillis());
+ add(connection);
+ if (LOG.isDebugEnabled()) {
+ // Use metric names
+ LOG.debug("Server connection from " + connection +
+ "; numOpenConnections=" + size() +
+ ", queueSize(bytes)=" + callQueueSizeInBytes.get() +
+ ", numCallsInGeneralQueue=" + scheduler.getGeneralQueueLength() +
+ ", numCallsInPriorityQueue=" + scheduler.getPriorityQueueLength());
+ }
+ return connection;
+ }
+
+ boolean close(Connection connection) {
+ boolean exists = remove(connection);
+ if (exists) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Thread.currentThread().getName() +
+ ": disconnecting client " + connection +
+ ". Number of active connections: "+ size());
+ }
+ // only close if actually removed to avoid double-closing due
+ // to possible races
+ connection.close();
+ }
+ return exists;
+ }
+
+ // synch'ed to avoid explicit invocation upon OOM from colliding with
+ // timer task firing
+ synchronized void closeIdle(boolean scanAll) {
+ long minLastContact = System.currentTimeMillis() - maxIdleTime;
+ // concurrent iterator might miss new connections added
+ // during the iteration, but that's ok because they won't
+ // be idle yet anyway and will be caught on next scan
+ int closed = 0;
+ for (Connection connection : connections) {
+ // stop if connections dropped below threshold unless scanning all
+ if (!scanAll && size() < idleScanThreshold) {
+ break;
+ }
+ // stop if not scanning all and max connections are closed
+ if (connection.isIdle() &&
+ connection.getLastContact() < minLastContact &&
+ close(connection) &&
+ !scanAll && (++closed == maxIdleToClose)) {
+ break;
+ }
+ }
+ }
+
+ void closeAll() {
+ // use a copy of the connections to be absolutely sure the concurrent
+ // iterator doesn't miss a connection
+ for (Connection connection : toArray()) {
+ close(connection);
+ }
+ }
+
+ void startIdleScan() {
+ scheduleIdleScanTask();
+ }
+
+ void stopIdleScan() {
+ idleScanTimer.cancel();
+ }
+
+ private void scheduleIdleScanTask() {
+ if (!running) {
+ return;
+ }
+ TimerTask idleScanTask = new TimerTask(){
+ @Override
+ public void run() {
+ if (!running) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Thread.currentThread().getName()+": task running");
+ }
+ try {
+ closeIdle(false);
+ } finally {
+ // explicitly reschedule so next execution occurs relative
+ // to the end of this scan, not the beginning
+ scheduleIdleScanTask();
+ }
+ }
+ };
+ idleScanTimer.schedule(idleScanTask, idleScanInterval);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index 1f496b4..743c5bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -41,7 +41,7 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
- HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
http://git-wip-us.apache.org/repos/asf/hbase/blob/e0b70c00/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index ceb945b..45cec78 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -263,7 +263,7 @@ public abstract class AbstractTestIPC {
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
- assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
+ assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
} finally {
rpcServer.stop();
}