You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2021/05/19 17:54:10 UTC
[hadoop] branch trunk updated: HDFS-15757 RBF: Improving Router
Connection Management (#2651)
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 43bf009 HDFS-15757 RBF: Improving Router Connection Management (#2651)
43bf009 is described below
commit 43bf0091120d7718347ec7135868f831d4750b1d
Author: lfengnan <lf...@uber.com>
AuthorDate: Wed May 19 10:53:42 2021 -0700
HDFS-15757 RBF: Improving Router Connection Management (#2651)
---
.../federation/metrics/FederationRPCMBean.java | 15 +++++
.../federation/metrics/FederationRPCMetrics.java | 10 ++++
.../federation/router/ConnectionContext.java | 57 +++++++++++++-----
.../federation/router/ConnectionManager.java | 53 +++++++++++++++--
.../server/federation/router/ConnectionPool.java | 67 ++++++++++++++++++----
.../server/federation/router/RouterRpcClient.java | 18 ++++++
.../federation/router/TestConnectionManager.java | 28 +++++----
7 files changed, 209 insertions(+), 39 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index 3cde5e5..a4469a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -77,6 +77,21 @@ public interface FederationRPCMBean {
int getRpcClientNumActiveConnections();
/**
+ * Get the number of idle RPC connections between the Router and the NNs.
+ * @return Number of idle RPC connections between the Router and the NNs.
+ */
+ int getRpcClientNumIdleConnections();
+
+ /**
+ * Get the number of recently active RPC connections between
+ * the Router and the NNs.
+ *
+ * @return Number of recently active RPC connections between
+ * the Router and the NNs.
+ */
+ int getRpcClientNumActiveConnectionsRecently();
+
+ /**
* Get the number of RPC connections to be created.
* @return Number of RPC connections to be created.
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index 887d50b..1e6aa80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -209,6 +209,16 @@ public class FederationRPCMetrics implements FederationRPCMBean {
}
@Override
+ public int getRpcClientNumIdleConnections() {
+ return rpcServer.getRPCClient().getNumIdleConnections();
+ }
+
+ @Override
+ public int getRpcClientNumActiveConnectionsRecently() {
+ return rpcServer.getRPCClient().getNumActiveConnectionsRecently();
+ }
+
+ @Override
public int getRpcClientNumCreatingConnections() {
return rpcServer.getRPCClient().getNumCreatingConnections();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
index 02a3dbe..9a5434b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
@@ -18,9 +18,13 @@
package org.apache.hadoop.hdfs.server.federation.router;
import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Context to track a connection in a {@link ConnectionPool}. When a client uses
@@ -36,13 +40,19 @@ import org.apache.hadoop.ipc.RPC;
*/
public class ConnectionContext {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ConnectionContext.class);
+
/** Client for the connection. */
private final ProxyAndInfo<?> client;
/** How many threads are using this connection. */
private int numThreads = 0;
/** If the connection is closed. */
private boolean closed = false;
-
+ /** Last timestamp the connection was active. */
+ private long lastActiveTs = 0;
+ /** The connection's active status would expire after this window. */
+ private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
public ConnectionContext(ProxyAndInfo<?> connection) {
this.client = connection;
@@ -58,6 +68,16 @@ public class ConnectionContext {
}
/**
+ * Check if the connection is/was active recently.
+ *
+ * @return True if the connection is active or
+ * was active in the past period of time.
+ */
+ public synchronized boolean isActiveRecently() {
+ return Time.monotonicNow() - this.lastActiveTs <= ACTIVE_WINDOW_TIME;
+ }
+
+ /**
* Check if the connection is closed.
*
* @return If the connection is closed.
@@ -83,30 +103,41 @@ public class ConnectionContext {
*/
public synchronized ProxyAndInfo<?> getClient() {
this.numThreads++;
+ this.lastActiveTs = Time.monotonicNow();
return this.client;
}
/**
- * Release this connection. If the connection was closed, close the proxy.
- * Otherwise, mark the connection as not used by us anymore.
+ * Release this connection.
*/
public synchronized void release() {
- if (--this.numThreads == 0 && this.closed) {
- close();
+ if (this.numThreads > 0) {
+ this.numThreads--;
}
}
/**
- * We will not use this connection anymore. If it's not being used, we close
- * it. Otherwise, we let release() do it once we are done with it.
+ * Close a connection. Only idle connections can be closed since
+ * the RPC proxy would be shut down immediately.
+ *
+ * @param force whether the connection should be closed anyway.
*/
- public synchronized void close() {
- this.closed = true;
- if (this.numThreads == 0) {
- Object proxy = this.client.getProxy();
- // Nobody should be using this anymore so it should close right away
- RPC.stopProxy(proxy);
+ public synchronized void close(boolean force) {
+ if (!force && this.numThreads > 0) {
+ // this is an erroneous case but we have to close the connection
+ // anyway since there will be connection leak if we don't do so
+ // the connection has been moved out of the pool
+ LOG.error("Active connection with {} handlers will be closed",
+ this.numThreads);
}
+ this.closed = true;
+ Object proxy = this.client.getProxy();
+ // Nobody should be using this anymore so it should close right away
+ RPC.stopProxy(proxy);
+ }
+
+ public synchronized void close() {
+ close(false);
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index 9ec3b54..b773a79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -282,6 +282,42 @@ public class ConnectionManager {
}
/**
+ * Get number of idle connections.
+ *
+ * @return Number of active connections.
+ */
+ public int getNumIdleConnections() {
+ int total = 0;
+ readLock.lock();
+ try {
+ for (ConnectionPool pool : this.pools.values()) {
+ total += pool.getNumIdleConnections();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return total;
+ }
+
+ /**
+ * Get number of recently active connections.
+ *
+ * @return Number of recently active connections.
+ */
+ public int getNumActiveConnectionsRecently() {
+ int total = 0;
+ readLock.lock();
+ try {
+ for (ConnectionPool pool : this.pools.values()) {
+ total += pool.getNumActiveConnectionsRecently();
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return total;
+ }
+
+ /**
* Get the number of connections to be created.
*
* @return Number of connections to be created.
@@ -327,12 +363,21 @@ public class ConnectionManager {
// Check if the pool hasn't been active in a while or not 50% are used
long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
int total = pool.getNumConnections();
- int active = pool.getNumActiveConnections();
+ // Active is a transient status in many cases for a connection since
+ // the handler thread uses the connection very quickly. Thus the number
+ // of connections with handlers using at the call time is constantly low.
+ // Recently active is more lasting status and it shows how many
+ // connections have been used with a recent time period. (i.e. 30 seconds)
+ int active = pool.getNumActiveConnectionsRecently();
float poolMinActiveRatio = pool.getMinActiveRatio();
if (timeSinceLastActive > connectionCleanupPeriodMs ||
active < poolMinActiveRatio * total) {
- // Remove and close 1 connection
- List<ConnectionContext> conns = pool.removeConnections(1);
+ // Be greedy here to close as many connections as possible in one shot
+ // The number should at least be 1
+ int targetConnectionsCount = Math.max(1,
+ (int)(poolMinActiveRatio * total) - active);
+ List<ConnectionContext> conns =
+ pool.removeConnections(targetConnectionsCount);
for (ConnectionContext conn : conns) {
conn.close();
}
@@ -414,7 +459,7 @@ public class ConnectionManager {
ConnectionPool pool = this.queue.take();
try {
int total = pool.getNumConnections();
- int active = pool.getNumActiveConnections();
+ int active = pool.getNumActiveConnectionsRecently();
float poolMinActiveRatio = pool.getMinActiveRatio();
if (pool.getNumConnections() < pool.getMaxSize() &&
active >= poolMinActiveRatio * total) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index 52e7ceb..827e62c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -252,19 +252,23 @@ public class ConnectionPool {
*/
public synchronized List<ConnectionContext> removeConnections(int num) {
List<ConnectionContext> removed = new LinkedList<>();
-
- // Remove and close the last connection
- List<ConnectionContext> tmpConnections = new ArrayList<>();
- for (int i=0; i<this.connections.size(); i++) {
- ConnectionContext conn = this.connections.get(i);
- if (i < this.minSize || i < this.connections.size() - num) {
- tmpConnections.add(conn);
- } else {
- removed.add(conn);
+ if (this.connections.size() > this.minSize) {
+ int targetCount = Math.min(num, this.connections.size() - this.minSize);
+ // Remove and close targetCount of connections
+ List<ConnectionContext> tmpConnections = new ArrayList<>();
+ for (int i = 0; i < this.connections.size(); i++) {
+ ConnectionContext conn = this.connections.get(i);
+ // Only pick idle connections to close
+ if (removed.size() < targetCount && conn.isUsable()) {
+ removed.add(conn);
+ } else {
+ tmpConnections.add(conn);
+ }
}
+ this.connections = tmpConnections;
}
- this.connections = tmpConnections;
-
+ LOG.debug("Expected to remove {} connection " +
+ "and actually removed {} connections", num, removed.size());
return removed;
}
@@ -278,7 +282,7 @@ public class ConnectionPool {
this.connectionPoolId, timeSinceLastActive);
for (ConnectionContext connection : this.connections) {
- connection.close();
+ connection.close(true);
}
this.connections.clear();
}
@@ -310,6 +314,39 @@ public class ConnectionPool {
}
/**
+ * Number of usable i.e. no active thread connections.
+ *
+ * @return Number of idle connections
+ */
+ protected int getNumIdleConnections() {
+ int ret = 0;
+
+ List<ConnectionContext> tmpConnections = this.connections;
+ for (ConnectionContext conn : tmpConnections) {
+ if (conn.isUsable()) {
+ ret++;
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Number of active connections recently in the pool.
+ *
+ * @return Number of active connections recently.
+ */
+ protected int getNumActiveConnectionsRecently() {
+ int ret = 0;
+ List<ConnectionContext> tmpConnections = this.connections;
+ for (ConnectionContext conn : tmpConnections) {
+ if (conn.isActiveRecently()) {
+ ret++;
+ }
+ }
+ return ret;
+ }
+
+ /**
* Get the last time the connection pool was used.
*
* @return Last time the connection pool was used.
@@ -331,12 +368,18 @@ public class ConnectionPool {
public String getJSON() {
final Map<String, String> info = new LinkedHashMap<>();
info.put("active", Integer.toString(getNumActiveConnections()));
+ info.put("recent_active",
+ Integer.toString(getNumActiveConnectionsRecently()));
+ info.put("idle", Integer.toString(getNumIdleConnections()));
info.put("total", Integer.toString(getNumConnections()));
if (LOG.isDebugEnabled()) {
List<ConnectionContext> tmpConnections = this.connections;
for (int i=0; i<tmpConnections.size(); i++) {
ConnectionContext connection = tmpConnections.get(i);
info.put(i + " active", Boolean.toString(connection.isActive()));
+ info.put(i + " recent_active",
+ Integer.toString(getNumActiveConnectionsRecently()));
+ info.put(i + " idle", Boolean.toString(connection.isUsable()));
info.put(i + " closed", Boolean.toString(connection.isClosed()));
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 87f2ed7..bc6d5be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -261,6 +261,24 @@ public class RouterRpcClient {
}
/**
+ * Total number of idle sockets between the router and NNs.
+ *
+ * @return Number of namenode clients.
+ */
+ public int getNumIdleConnections() {
+ return this.connectionManager.getNumIdleConnections();
+ }
+
+ /**
+ * Total number of active sockets between the router and NNs.
+ *
+ * @return Number of recently active namenode clients.
+ */
+ public int getNumActiveConnectionsRecently() {
+ return this.connectionManager.getNumActiveConnectionsRecently();
+ }
+
+ /**
* Total number of open connection pools to a NN. Each connection pool.
* represents one user + one NN.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
index dffd3d8..e397692 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -255,6 +255,9 @@ public class TestConnectionManager {
if (e.getKey().getUgi() == ugi) {
assertEquals(numOfConns, e.getValue().getNumConnections());
assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections());
+ // idle + active = total connections
+ assertEquals(numOfConns - numOfActiveConns,
+ e.getValue().getNumIdleConnections());
connPoolFoundForUser = true;
}
}
@@ -265,13 +268,19 @@ public class TestConnectionManager {
@Test
public void testConfigureConnectionActiveRatio() throws IOException {
- final int totalConns = 10;
- int activeConns = 7;
+ // test 1 conn below the threshold and these conns are closed
+ testConnectionCleanup(0.8f, 10, 7, 9);
+
+ // test 2 conn below the threshold and these conns are closed
+ testConnectionCleanup(0.8f, 10, 6, 8);
+ }
+ private void testConnectionCleanup(float ratio, int totalConns,
+ int activeConns, int leftConns) throws IOException {
Configuration tmpConf = new Configuration();
- // Set dfs.federation.router.connection.min-active-ratio 0.8f
+ // Set dfs.federation.router.connection.min-active-ratio
tmpConf.setFloat(
- RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, 0.8f);
+ RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, ratio);
ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
tmpConnManager.start();
@@ -284,21 +293,20 @@ public class TestConnectionManager {
TEST_NN_ADDRESS, NamenodeProtocol.class);
ConnectionPool pool = poolMap.get(connectionPoolId);
- // Test min active ratio is 0.8f
- assertEquals(0.8f, pool.getMinActiveRatio(), 0.001f);
+ // Test min active ratio is as set value
+ assertEquals(ratio, pool.getMinActiveRatio(), 0.001f);
pool.getConnection().getClient();
// Test there is one active connection in pool
assertEquals(1, pool.getNumActiveConnections());
- // Add other 6 active/9 total connections to pool
+ // Add other active-1 connections / totalConns-1 connections to pool
addConnectionsToPool(pool, totalConns - 1, activeConns - 1);
- // There are 7 active connections.
- // The active number is less than totalConns(10) * minActiveRatio(0.8f).
+ // There are activeConn connections.
// We can cleanup the pool
tmpConnManager.cleanup(pool);
- assertEquals(totalConns - 1, pool.getNumConnections());
+ assertEquals(leftConns, pool.getNumConnections());
tmpConnManager.close();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org