You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2016/06/07 23:41:51 UTC
hbase git commit: Revert "HBASE-15948 Port "HADOOP-9956 RPC listener
inefficiently assigns connections to readers"" Revert mistaken commit... This
reverts commit e0b70c00e74aeaac33570508e3732a53daea839e.
Repository: hbase
Updated Branches:
refs/heads/master 6d5a25935 -> e66ecd7db
Revert "HBASE-15948 Port "HADOOP-9956 RPC listener inefficiently assigns connections to readers""
Revert mistaken commit...
This reverts commit e0b70c00e74aeaac33570508e3732a53daea839e.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e66ecd7d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e66ecd7d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e66ecd7d
Branch: refs/heads/master
Commit: e66ecd7db68d6ef57084543d08f7774c82f22f45
Parents: 6d5a259
Author: stack <st...@apache.org>
Authored: Tue Jun 7 16:41:30 2016 -0700
Committer: stack <st...@apache.org>
Committed: Tue Jun 7 16:41:30 2016 -0700
----------------------------------------------------------------------
.../hbase/ipc/MetricsHBaseServerSource.java | 10 +-
.../ipc/MetricsHBaseServerWrapperImpl.java | 6 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 408 ++++++++-----------
.../regionserver/SimpleRpcSchedulerFactory.java | 2 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 2 +-
5 files changed, 187 insertions(+), 241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index ce57e0f..bb89789 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -52,16 +52,14 @@ public interface MetricsHBaseServerSource extends BaseSource {
String TOTAL_CALL_TIME_NAME = "totalCallTime";
String TOTAL_CALL_TIME_DESC = "Total call time, including both queued and processing time.";
String QUEUE_SIZE_NAME = "queueSize";
- String QUEUE_SIZE_DESC = "Number of bytes in the call queues; request has been read and " +
- "parsed and is waiting to run or is currently being executed.";
+ String QUEUE_SIZE_DESC = "Number of bytes in the call queues.";
String GENERAL_QUEUE_NAME = "numCallsInGeneralQueue";
- String GENERAL_QUEUE_DESC = "Number of calls in the general call queue; " +
- "parsed requests waiting in scheduler to be executed";
+ String GENERAL_QUEUE_DESC = "Number of calls in the general call queue.";
String PRIORITY_QUEUE_NAME = "numCallsInPriorityQueue";
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
String REPLICATION_QUEUE_DESC =
- "Number of calls in the replication call queue waiting to be run";
- String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
+ "Number of calls in the replication call queue.";
+ String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue.";
String NUM_OPEN_CONNECTIONS_NAME = "numOpenConnections";
String NUM_OPEN_CONNECTIONS_DESC = "Number of open connections.";
String NUM_ACTIVE_HANDLER_NAME = "numActiveHandler";
http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
index 4f53709..9979c75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java
@@ -36,7 +36,7 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
if (!isServerStarted()) {
return 0;
}
- return server.callQueueSizeInBytes.get();
+ return server.callQueueSize.get();
}
@Override
@@ -65,10 +65,10 @@ public class MetricsHBaseServerWrapperImpl implements MetricsHBaseServerWrapper
@Override
public int getNumOpenConnections() {
- if (!isServerStarted()) {
+ if (!isServerStarted() || this.server.connectionList == null) {
return 0;
}
- return server.getNumOpenConnections();
+ return server.connectionList.size();
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index aca3fdd..483ce86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -48,16 +48,15 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -114,7 +113,6 @@ import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -185,6 +183,11 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
+ /**
+ * The maximum size that we can hold in the RPC queue
+ */
+ private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
+
private final IPCUtil ipcUtil;
private static final String AUTH_FAILED_FOR = "Auth failed for ";
@@ -207,30 +210,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected int port; // port we listen on
protected InetSocketAddress address; // inet address we listen on
private int readThreads; // number of read threads
+ protected int maxIdleTime; // the maximum idle time after
+ // which a client may be
+ // disconnected
+ protected int thresholdIdleConnections; // the number of idle
+ // connections after which we
+ // will start cleaning up idle
+ // connections
+ int maxConnectionsToNuke; // the max number of
+ // connections to nuke
+ // during a cleanup
+
protected MetricsHBaseServer metrics;
protected final Configuration conf;
- /**
- * Maximum size in bytes of the currently queued and running Calls. If a new Call puts us over
- * this size, then we will reject the call (after parsing it though). It will go back to the
- * client and client will retry. Set this size with "hbase.ipc.server.max.callqueue.size". The
- * call queue size gets incremented after we parse a call and before we add it to the queue of
- * calls for the scheduler to use. It get decremented after we have 'run' the Call. The current
- * size is kept in {@link #callQueueSizeInBytes}.
- * @see {@link #callQueueSizeInBytes}
- * @see {@link #DEFAULT_MAX_CALLQUEUE_SIZE}
- * @see {@link #callQueueSizeInBytes}
- */
- private final long maxQueueSizeInBytes;
- private static final int DEFAULT_MAX_CALLQUEUE_SIZE = 1024 * 1024 * 1024;
-
- /**
- * This is a running count of the size in bytes of all outstanding calls whether currently
- * executing or queued waiting to be run.
- */
- protected final Counter callQueueSizeInBytes = new Counter();
-
+ private int maxQueueSize;
protected int socketSendBufferSize;
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
@@ -249,11 +244,19 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/
volatile boolean started = false;
- // maintains the set of client connections and handles idle timeouts
- private ConnectionManager connectionManager;
+ /**
+ * This is a running count of the size of all outstanding calls by size.
+ */
+ protected final Counter callQueueSize = new Counter();
+
+ protected final List<Connection> connectionList =
+ Collections.synchronizedList(new LinkedList<Connection>());
+ //maintain a list
+ //of client connections
private Listener listener = null;
protected Responder responder = null;
protected AuthenticationTokenSecretManager authTokenSecretMgr = null;
+ protected int numConnections = 0;
protected HBaseRPCErrorHandler errorHandler = null;
@@ -620,13 +623,18 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null;
private int currentReader = 0;
+ private Random rand = new Random();
+ private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
+ //-tion (for idle connections) ran
+ private long cleanupInterval = 10000; //the minimum interval between
+ //two cleanup runs
+ private int backlogLength;
private ExecutorService readPool;
public Listener(final String name) throws IOException {
super(name);
- // The backlog of requests that we will have the serversocket carry.
- int backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
+ backlogLength = conf.getInt("hbase.ipc.server.listen.queue.size", 128);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
@@ -636,11 +644,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
address = (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
// create a selector;
- selector = Selector.open();
+ selector= Selector.open();
readers = new Reader[readThreads];
- // Why this executor thing? Why not like hadoop just start up all the threads? I suppose it
- // has an advantage in that it is easy to shutdown the pool.
readPool = Executors.newFixedThreadPool(readThreads,
new ThreadFactoryBuilder().setNameFormat(
"RpcServer.reader=%d,bindAddress=" + bindAddress.getHostName() +
@@ -661,12 +667,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private class Reader implements Runnable {
+ private volatile boolean adding = false;
private final Selector readSelector;
Reader() throws IOException {
this.readSelector = Selector.open();
}
-
@Override
public void run() {
try {
@@ -680,10 +686,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
}
- private void doRunLoop() {
+ private synchronized void doRunLoop() {
while (running) {
try {
readSelector.select();
+ while (adding) {
+ this.wait(1000);
+ }
+
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
@@ -693,12 +703,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
doRead(key);
}
}
- key = null;
}
} catch (InterruptedException e) {
- if (running) { // unexpected -- log it
- LOG.info(Thread.currentThread().getName() + " unexpectedly interrupted", e);
- }
+ LOG.debug("Interrupted while sleeping");
return;
} catch (IOException ex) {
LOG.info(getName() + ": IOException in Reader", ex);
@@ -707,14 +714,76 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
/**
- * Updating the readSelector while it's being used is not thread-safe,
- * so the connection must be queued. The reader will drain the queue
- * and update its readSelector before performing the next select
+ * This gets reader into the state that waits for the new channel
+ * to be registered with readSelector. If it was waiting in select()
+ * the thread will be woken up, otherwise whenever select() is called
+ * it will return even if there is nothing to read and wait
+ * in while(adding) for finishAdd call
*/
- public void addConnection(Connection conn) throws IOException {
- conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
+ public void startAdd() {
+ adding = true;
readSelector.wakeup();
}
+
+ public synchronized SelectionKey registerChannel(SocketChannel channel)
+ throws IOException {
+ return channel.register(readSelector, SelectionKey.OP_READ);
+ }
+
+ public synchronized void finishAdd() {
+ adding = false;
+ this.notify();
+ }
+ }
+
+ /** cleanup connections from connectionList. Choose a random range
+ * to scan and also have a limit on the number of the connections
+ * that will be cleanedup per run. The criteria for cleanup is the time
+ * for which the connection was idle. If 'force' is true then all
+ * connections will be looked at for the cleanup.
+ * @param force all connections will be looked at for cleanup
+ */
+ private void cleanupConnections(boolean force) {
+ if (force || numConnections > thresholdIdleConnections) {
+ long currentTime = System.currentTimeMillis();
+ if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
+ return;
+ }
+ int start = 0;
+ int end = numConnections - 1;
+ if (!force) {
+ start = rand.nextInt() % numConnections;
+ end = rand.nextInt() % numConnections;
+ int temp;
+ if (end < start) {
+ temp = start;
+ start = end;
+ end = temp;
+ }
+ }
+ int i = start;
+ int numNuked = 0;
+ while (i <= end) {
+ Connection c;
+ synchronized (connectionList) {
+ try {
+ c = connectionList.get(i);
+ } catch (Exception e) {return;}
+ }
+ if (c.timedOut(currentTime)) {
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
+ closeConnection(c);
+ numNuked++;
+ end--;
+ //noinspection UnusedAssignment
+ c = null;
+ if (!force && numNuked == maxConnectionsToNuke) break;
+ }
+ else i++;
+ }
+ lastCleanupRunTime = System.currentTimeMillis();
+ }
}
@Override
@@ -723,7 +792,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
"it will have per impact")
public void run() {
LOG.info(getName() + ": starting");
- connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
@@ -747,7 +815,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (errorHandler.checkOOME(e)) {
LOG.info(getName() + ": exiting on OutOfMemoryError");
closeCurrentConnection(key, e);
- connectionManager.closeIdle(true);
+ cleanupConnections(true);
return;
}
} else {
@@ -756,18 +824,22 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// some thread(s) a chance to finish
LOG.warn(getName() + ": OutOfMemoryError in server select", e);
closeCurrentConnection(key, e);
- connectionManager.closeIdle(true);
+ cleanupConnections(true);
try {
Thread.sleep(60000);
} catch (InterruptedException ex) {
LOG.debug("Interrupted while sleeping");
+ return;
}
}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
+ cleanupConnections(false);
}
+
LOG.info(getName() + ": stopping");
+
synchronized (this) {
try {
acceptChannel.close();
@@ -779,9 +851,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
selector= null;
acceptChannel= null;
- // close all connections
- connectionManager.stopIdleScan();
- connectionManager.closeAll();
+ // clean up all connections
+ while (!connectionList.isEmpty()) {
+ closeConnection(connectionList.remove(0));
+ }
}
}
@@ -789,6 +862,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
if (key != null) {
Connection c = (Connection)key.attachment();
if (c != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": disconnecting client " + c.getHostAddress() +
+ (e != null ? " on error " + e.getMessage() : ""));
+ }
closeConnection(c);
key.attach(null);
}
@@ -799,24 +876,37 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return address;
}
- void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
+ void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
+ Connection c;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
+
SocketChannel channel;
while ((channel = server.accept()) != null) {
- channel.configureBlocking(false);
- channel.socket().setTcpNoDelay(tcpNoDelay);
- channel.socket().setKeepAlive(tcpKeepAlive);
+ try {
+ channel.configureBlocking(false);
+ channel.socket().setTcpNoDelay(tcpNoDelay);
+ channel.socket().setKeepAlive(tcpKeepAlive);
+ } catch (IOException ioe) {
+ channel.close();
+ throw ioe;
+ }
+
Reader reader = getReader();
- Connection c = connectionManager.register(channel);
- // If the connectionManager can't take it, close the connection.
- if (c == null) {
- if (channel.isOpen()) {
- IOUtils.cleanup(null, channel);
+ try {
+ reader.startAdd();
+ SelectionKey readKey = reader.registerChannel(channel);
+ c = getConnection(channel, System.currentTimeMillis());
+ readKey.attach(c);
+ synchronized (connectionList) {
+ connectionList.add(numConnections, c);
+ numConnections++;
}
- continue;
+ if (LOG.isDebugEnabled())
+ LOG.debug(getName() + ": connection from " + c.toString() +
+ "; # active connections: " + numConnections);
+ } finally {
+ reader.finishAdd();
}
- key.attach(c); // so closeCurrentConnection can get the object
- reader.addConnection(c);
}
}
@@ -829,8 +919,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess();
+
+ if (count > 0) {
+ c.setLastContact(System.currentTimeMillis());
+ }
+
} catch (InterruptedException ieo) {
- LOG.info(Thread.currentThread().getName() + ": readAndProcess caught InterruptedException", ieo);
throw ieo;
} catch (Exception e) {
if (LOG.isDebugEnabled()) {
@@ -839,10 +933,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getName() + ": DISCONNECTING client " + c.toString() +
+ " because read count=" + count +
+ ". Number of active connections: " + numConnections);
+ }
closeConnection(c);
- c = null;
- } else {
- c.setLastContact(System.currentTimeMillis());
}
}
@@ -1259,10 +1355,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return null;
}
- public long getLastContact() {
- return lastContact;
- }
-
/* Return true if the connection has no outstanding rpc */
private boolean isIdle() {
return rpcCount.get() == 0;
@@ -1278,6 +1370,10 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
rpcCount.increment();
}
+ protected boolean timedOut(long currentTime) {
+ return isIdle() && currentTime - lastContact > maxIdleTime;
+ }
+
private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws IOException {
UserGroupInformation authorizedUgi;
@@ -1787,7 +1883,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
// Enforcing the call queue size, this triggers a retry in the client
// This is a bit late to be doing this check - we have already read in the total request.
- if ((totalRequestSize + callQueueSizeInBytes.get()) > maxQueueSizeInBytes) {
+ if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig =
new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null, 0);
@@ -1858,7 +1954,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
totalRequestSize, traceInfo, this.addr, timeout);
if (!scheduler.dispatch(new CallRunner(RpcServer.this, call))) {
- callQueueSizeInBytes.add(-1 * call.getSize());
+ callQueueSize.add(-1 * call.getSize());
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
metrics.exception(CALL_QUEUE_TOO_BIG_EXCEPTION);
@@ -1997,10 +2093,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.bindAddress = bindAddress;
this.conf = conf;
this.socketSendBufferSize = 0;
- // See declaration above for documentation on what this size is.
- this.maxQueueSizeInBytes =
- this.conf.getLong("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
+ this.maxQueueSize =
+ this.conf.getInt("hbase.ipc.server.max.callqueue.size", DEFAULT_MAX_CALLQUEUE_SIZE);
this.readThreads = conf.getInt("hbase.ipc.server.read.threadpool.size", 10);
+ this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 1000);
+ this.maxConnectionsToNuke = conf.getInt("hbase.ipc.client.kill.max", 10);
+ this.thresholdIdleConnections = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
this.purgeTimeout = conf.getLong("hbase.ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.warnResponseTime = conf.getInt(WARN_RESPONSE_TIME, DEFAULT_WARN_RESPONSE_TIME);
@@ -2022,7 +2120,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
// Create the responder here
responder = new Responder();
- connectionManager = new ConnectionManager();
this.authorize = conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
this.userProvider = UserProvider.instantiate(conf);
this.isSecurityEnabled = userProvider.isHBaseSecurityEnabled();
@@ -2080,7 +2177,12 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
protected void closeConnection(Connection connection) {
- connectionManager.close(connection);
+ synchronized (connectionList) {
+ if (connectionList.remove(connection)) {
+ numConnections--;
+ }
+ }
+ connection.close();
}
Configuration getConf() {
@@ -2338,7 +2440,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
@Override
public void addCallSize(final long diff) {
- this.callQueueSizeInBytes.add(diff);
+ this.callQueueSize.add(diff);
}
/**
@@ -2476,14 +2578,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
/**
- * The number of open RPC conections
- * @return the number of open rpc connections
- */
- public int getNumOpenConnections() {
- return connectionManager.size();
- }
-
- /**
* Returns the username for any user associated with the current RPC
* request or <code>null</code> if no user is set.
*/
@@ -2601,150 +2695,4 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
public RpcScheduler getScheduler() {
return scheduler;
}
-
- private class ConnectionManager {
- final private AtomicInteger count = new AtomicInteger();
- final private Set<Connection> connections;
-
- final private Timer idleScanTimer;
- final private int idleScanThreshold;
- final private int idleScanInterval;
- final private int maxIdleTime;
- final private int maxIdleToClose;
-
- ConnectionManager() {
- this.idleScanTimer = new Timer("RpcServer idle connection scanner for port " + port, true);
- this.idleScanThreshold = conf.getInt("hbase.ipc.client.idlethreshold", 4000);
- this.idleScanInterval =
- conf.getInt("hbase.ipc.client.connection.idle-scan-interval.ms", 10000);
- this.maxIdleTime = 2 * conf.getInt("hbase.ipc.client.connection.maxidletime", 10000);
- this.maxIdleToClose = conf.getInt("hbase.ipc.client.kill.max", 10);
- int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
- HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
- int maxConnectionQueueSize =
- handlerCount * conf.getInt("hbase.ipc.server.handler.queue.size", 100);
- // create a set with concurrency -and- a thread-safe iterator, add 2
- // for listener and idle closer threads
- this.connections = Collections.newSetFromMap(
- new ConcurrentHashMap<Connection,Boolean>(
- maxConnectionQueueSize, 0.75f, readThreads+2));
- }
-
- private boolean add(Connection connection) {
- boolean added = connections.add(connection);
- if (added) {
- count.getAndIncrement();
- }
- return added;
- }
-
- private boolean remove(Connection connection) {
- boolean removed = connections.remove(connection);
- if (removed) {
- count.getAndDecrement();
- }
- return removed;
- }
-
- int size() {
- return count.get();
- }
-
- Connection[] toArray() {
- return connections.toArray(new Connection[0]);
- }
-
- Connection register(SocketChannel channel) {
- Connection connection = new Connection(channel, System.currentTimeMillis());
- add(connection);
- if (LOG.isDebugEnabled()) {
- // Use metric names
- LOG.debug("Server connection from " + connection +
- "; numOpenConnections=" + size() +
- ", queueSize(bytes)=" + callQueueSizeInBytes.get() +
- ", numCallsInGeneralQueue=" + scheduler.getGeneralQueueLength() +
- ", numCallsInPriorityQueue=" + scheduler.getPriorityQueueLength());
- }
- return connection;
- }
-
- boolean close(Connection connection) {
- boolean exists = remove(connection);
- if (exists) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(Thread.currentThread().getName() +
- ": disconnecting client " + connection +
- ". Number of active connections: "+ size());
- }
- // only close if actually removed to avoid double-closing due
- // to possible races
- connection.close();
- }
- return exists;
- }
-
- // synch'ed to avoid explicit invocation upon OOM from colliding with
- // timer task firing
- synchronized void closeIdle(boolean scanAll) {
- long minLastContact = System.currentTimeMillis() - maxIdleTime;
- // concurrent iterator might miss new connections added
- // during the iteration, but that's ok because they won't
- // be idle yet anyway and will be caught on next scan
- int closed = 0;
- for (Connection connection : connections) {
- // stop if connections dropped below threshold unless scanning all
- if (!scanAll && size() < idleScanThreshold) {
- break;
- }
- // stop if not scanning all and max connections are closed
- if (connection.isIdle() &&
- connection.getLastContact() < minLastContact &&
- close(connection) &&
- !scanAll && (++closed == maxIdleToClose)) {
- break;
- }
- }
- }
-
- void closeAll() {
- // use a copy of the connections to be absolutely sure the concurrent
- // iterator doesn't miss a connection
- for (Connection connection : toArray()) {
- close(connection);
- }
- }
-
- void startIdleScan() {
- scheduleIdleScanTask();
- }
-
- void stopIdleScan() {
- idleScanTimer.cancel();
- }
-
- private void scheduleIdleScanTask() {
- if (!running) {
- return;
- }
- TimerTask idleScanTask = new TimerTask(){
- @Override
- public void run() {
- if (!running) {
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(Thread.currentThread().getName()+": task running");
- }
- try {
- closeIdle(false);
- } finally {
- // explicitly reschedule so next execution occurs relative
- // to the end of this scan, not the beginning
- scheduleIdleScanTask();
- }
- }
- };
- idleScanTimer.schedule(idleScanTask, idleScanInterval);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
index 743c5bb..1f496b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
@@ -41,7 +41,7 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
@Override
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
- HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
return new SimpleRpcScheduler(
conf,
http://git-wip-us.apache.org/repos/asf/hbase/blob/e66ecd7d/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 45cec78..ceb945b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -263,7 +263,7 @@ public abstract class AbstractTestIPC {
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
- assertTrue(e.toString(), StringUtils.stringifyException(e).contains("Injected fault"));
+ assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
} finally {
rpcServer.stop();
}