You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2021/05/19 17:54:10 UTC

[hadoop] branch trunk updated: HDFS-15757 RBF: Improving Router Connection Management (#2651)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 43bf009  HDFS-15757 RBF: Improving Router Connection Management (#2651)
43bf009 is described below

commit 43bf0091120d7718347ec7135868f831d4750b1d
Author: lfengnan <lf...@uber.com>
AuthorDate: Wed May 19 10:53:42 2021 -0700

    HDFS-15757 RBF: Improving Router Connection Management (#2651)
---
 .../federation/metrics/FederationRPCMBean.java     | 15 +++++
 .../federation/metrics/FederationRPCMetrics.java   | 10 ++++
 .../federation/router/ConnectionContext.java       | 57 +++++++++++++-----
 .../federation/router/ConnectionManager.java       | 53 +++++++++++++++--
 .../server/federation/router/ConnectionPool.java   | 67 ++++++++++++++++++----
 .../server/federation/router/RouterRpcClient.java  | 18 ++++++
 .../federation/router/TestConnectionManager.java   | 28 +++++----
 7 files changed, 209 insertions(+), 39 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index 3cde5e5..a4469a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -77,6 +77,21 @@ public interface FederationRPCMBean {
   int getRpcClientNumActiveConnections();
 
   /**
+   * Get the number of idle RPC connections between the Router and the NNs.
+   * @return Number of idle RPC connections between the Router and the NNs.
+   */
+  int getRpcClientNumIdleConnections();
+
+  /**
+   * Get the number of recently active RPC connections between
+   * the Router and the NNs.
+   *
+   * @return Number of recently active RPC connections between
+   * the Router and the NNs.
+   */
+  int getRpcClientNumActiveConnectionsRecently();
+
+  /**
    * Get the number of RPC connections to be created.
    * @return Number of RPC connections to be created.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index 887d50b..1e6aa80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -209,6 +209,16 @@ public class FederationRPCMetrics implements FederationRPCMBean {
   }
 
   @Override
+  public int getRpcClientNumIdleConnections() {
+    return rpcServer.getRPCClient().getNumIdleConnections();
+  }
+
+  @Override
+  public int getRpcClientNumActiveConnectionsRecently() {
+    return rpcServer.getRPCClient().getNumActiveConnectionsRecently();
+  }
+
+  @Override
   public int getRpcClientNumCreatingConnections() {
     return rpcServer.getRPCClient().getNumCreatingConnections();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
index 02a3dbe..9a5434b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionContext.java
@@ -18,9 +18,13 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Context to track a connection in a {@link ConnectionPool}. When a client uses
@@ -36,13 +40,19 @@ import org.apache.hadoop.ipc.RPC;
  */
 public class ConnectionContext {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ConnectionContext.class);
+
   /** Client for the connection. */
   private final ProxyAndInfo<?> client;
   /** How many threads are using this connection. */
   private int numThreads = 0;
   /** If the connection is closed. */
   private boolean closed = false;
-
+  /** Last timestamp the connection was active. */
+  private long lastActiveTs = 0;
+  /** The connection's active status would expire after this window. */
+  private final static long ACTIVE_WINDOW_TIME = TimeUnit.SECONDS.toMillis(30);
 
   public ConnectionContext(ProxyAndInfo<?> connection) {
     this.client = connection;
@@ -58,6 +68,16 @@ public class ConnectionContext {
   }
 
   /**
+   * Check if the connection is/was active recently.
+   *
+   * @return True if the connection is active or
+   * was active in the past period of time.
+   */
+  public synchronized boolean isActiveRecently() {
+    return Time.monotonicNow() - this.lastActiveTs <= ACTIVE_WINDOW_TIME;
+  }
+
+  /**
    * Check if the connection is closed.
    *
    * @return If the connection is closed.
@@ -83,30 +103,41 @@ public class ConnectionContext {
    */
   public synchronized ProxyAndInfo<?> getClient() {
     this.numThreads++;
+    this.lastActiveTs = Time.monotonicNow();
     return this.client;
   }
 
   /**
-   * Release this connection. If the connection was closed, close the proxy.
-   * Otherwise, mark the connection as not used by us anymore.
+   * Release this connection.
    */
   public synchronized void release() {
-    if (--this.numThreads == 0 && this.closed) {
-      close();
+    if (this.numThreads > 0) {
+      this.numThreads--;
     }
   }
 
   /**
-   * We will not use this connection anymore. If it's not being used, we close
-   * it. Otherwise, we let release() do it once we are done with it.
+   * Close a connection. Only idle connections can be closed since
+   * the RPC proxy would be shut down immediately.
+   *
+   * @param force whether the connection should be closed anyway.
    */
-  public synchronized void close() {
-    this.closed = true;
-    if (this.numThreads == 0) {
-      Object proxy = this.client.getProxy();
-      // Nobody should be using this anymore so it should close right away
-      RPC.stopProxy(proxy);
+  public synchronized void close(boolean force) {
+    if (!force && this.numThreads > 0) {
+      // this is an erroneous case but we have to close the connection
+      // anyway since there will be connection leak if we don't do so
+      // the connection has been moved out of the pool
+      LOG.error("Active connection with {} handlers will be closed",
+          this.numThreads);
     }
+    this.closed = true;
+    Object proxy = this.client.getProxy();
+    // Nobody should be using this anymore so it should close right away
+    RPC.stopProxy(proxy);
+  }
+
+  public synchronized void close() {
+    close(false);
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index 9ec3b54..b773a79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -282,6 +282,42 @@ public class ConnectionManager {
   }
 
   /**
+   * Get number of idle connections.
+   *
+   * @return Number of active connections.
+   */
+  public int getNumIdleConnections() {
+    int total = 0;
+    readLock.lock();
+    try {
+      for (ConnectionPool pool : this.pools.values()) {
+        total += pool.getNumIdleConnections();
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return total;
+  }
+
+  /**
+   * Get number of recently active connections.
+   *
+   * @return Number of recently active connections.
+   */
+  public int getNumActiveConnectionsRecently() {
+    int total = 0;
+    readLock.lock();
+    try {
+      for (ConnectionPool pool : this.pools.values()) {
+        total += pool.getNumActiveConnectionsRecently();
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return total;
+  }
+
+  /**
    * Get the number of connections to be created.
    *
    * @return Number of connections to be created.
@@ -327,12 +363,21 @@ public class ConnectionManager {
       // Check if the pool hasn't been active in a while or not 50% are used
       long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
       int total = pool.getNumConnections();
-      int active = pool.getNumActiveConnections();
+      // Active is a transient status in many cases for a connection since
+      // the handler thread uses the connection very quickly. Thus the number
+      // of connections with handlers using at the call time is constantly low.
+      // Recently active is more lasting status and it shows how many
+      // connections have been used with a recent time period. (i.e. 30 seconds)
+      int active = pool.getNumActiveConnectionsRecently();
       float poolMinActiveRatio = pool.getMinActiveRatio();
       if (timeSinceLastActive > connectionCleanupPeriodMs ||
           active < poolMinActiveRatio * total) {
-        // Remove and close 1 connection
-        List<ConnectionContext> conns = pool.removeConnections(1);
+        // Be greedy here to close as many connections as possible in one shot
+        // The number should at least be 1
+        int targetConnectionsCount = Math.max(1,
+            (int)(poolMinActiveRatio * total) - active);
+        List<ConnectionContext> conns =
+            pool.removeConnections(targetConnectionsCount);
         for (ConnectionContext conn : conns) {
           conn.close();
         }
@@ -414,7 +459,7 @@ public class ConnectionManager {
           ConnectionPool pool = this.queue.take();
           try {
             int total = pool.getNumConnections();
-            int active = pool.getNumActiveConnections();
+            int active = pool.getNumActiveConnectionsRecently();
             float poolMinActiveRatio = pool.getMinActiveRatio();
             if (pool.getNumConnections() < pool.getMaxSize() &&
                 active >= poolMinActiveRatio * total) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index 52e7ceb..827e62c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -252,19 +252,23 @@ public class ConnectionPool {
    */
   public synchronized List<ConnectionContext> removeConnections(int num) {
     List<ConnectionContext> removed = new LinkedList<>();
-
-    // Remove and close the last connection
-    List<ConnectionContext> tmpConnections = new ArrayList<>();
-    for (int i=0; i<this.connections.size(); i++) {
-      ConnectionContext conn = this.connections.get(i);
-      if (i < this.minSize || i < this.connections.size() - num) {
-        tmpConnections.add(conn);
-      } else {
-        removed.add(conn);
+    if (this.connections.size() > this.minSize) {
+      int targetCount = Math.min(num, this.connections.size() - this.minSize);
+      // Remove and close targetCount of connections
+      List<ConnectionContext> tmpConnections = new ArrayList<>();
+      for (int i = 0; i < this.connections.size(); i++) {
+        ConnectionContext conn = this.connections.get(i);
+        // Only pick idle connections to close
+        if (removed.size() < targetCount && conn.isUsable()) {
+          removed.add(conn);
+        } else {
+          tmpConnections.add(conn);
+        }
       }
+      this.connections = tmpConnections;
     }
-    this.connections = tmpConnections;
-
+    LOG.debug("Expected to remove {} connection " +
+        "and actually removed {} connections", num, removed.size());
     return removed;
   }
 
@@ -278,7 +282,7 @@ public class ConnectionPool {
         this.connectionPoolId, timeSinceLastActive);
 
     for (ConnectionContext connection : this.connections) {
-      connection.close();
+      connection.close(true);
     }
     this.connections.clear();
   }
@@ -310,6 +314,39 @@ public class ConnectionPool {
   }
 
   /**
+   * Number of usable i.e. no active thread connections.
+   *
+   * @return Number of idle connections
+   */
+  protected int getNumIdleConnections() {
+    int ret = 0;
+
+    List<ConnectionContext> tmpConnections = this.connections;
+    for (ConnectionContext conn : tmpConnections) {
+      if (conn.isUsable()) {
+        ret++;
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Number of active connections recently in the pool.
+   *
+   * @return Number of active connections recently.
+   */
+  protected int getNumActiveConnectionsRecently() {
+    int ret = 0;
+    List<ConnectionContext> tmpConnections = this.connections;
+    for (ConnectionContext conn : tmpConnections) {
+      if (conn.isActiveRecently()) {
+        ret++;
+      }
+    }
+    return ret;
+  }
+
+  /**
    * Get the last time the connection pool was used.
    *
    * @return Last time the connection pool was used.
@@ -331,12 +368,18 @@ public class ConnectionPool {
   public String getJSON() {
     final Map<String, String> info = new LinkedHashMap<>();
     info.put("active", Integer.toString(getNumActiveConnections()));
+    info.put("recent_active",
+        Integer.toString(getNumActiveConnectionsRecently()));
+    info.put("idle", Integer.toString(getNumIdleConnections()));
     info.put("total", Integer.toString(getNumConnections()));
     if (LOG.isDebugEnabled()) {
       List<ConnectionContext> tmpConnections = this.connections;
       for (int i=0; i<tmpConnections.size(); i++) {
         ConnectionContext connection = tmpConnections.get(i);
         info.put(i + " active", Boolean.toString(connection.isActive()));
+        info.put(i + " recent_active",
+            Integer.toString(getNumActiveConnectionsRecently()));
+        info.put(i + " idle", Boolean.toString(connection.isUsable()));
         info.put(i + " closed", Boolean.toString(connection.isClosed()));
       }
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 87f2ed7..bc6d5be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -261,6 +261,24 @@ public class RouterRpcClient {
   }
 
   /**
+   * Total number of idle sockets between the router and NNs.
+   *
+   * @return Number of namenode clients.
+   */
+  public int getNumIdleConnections() {
+    return this.connectionManager.getNumIdleConnections();
+  }
+
+  /**
+   * Total number of active sockets between the router and NNs.
+   *
+   * @return Number of recently active namenode clients.
+   */
+  public int getNumActiveConnectionsRecently() {
+    return this.connectionManager.getNumActiveConnectionsRecently();
+  }
+
+  /**
    * Total number of open connection pools to a NN. Each connection pool.
    * represents one user + one NN.
    *
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
index dffd3d8..e397692 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -255,6 +255,9 @@ public class TestConnectionManager {
       if (e.getKey().getUgi() == ugi) {
         assertEquals(numOfConns, e.getValue().getNumConnections());
         assertEquals(numOfActiveConns, e.getValue().getNumActiveConnections());
+        // idle + active = total connections
+        assertEquals(numOfConns - numOfActiveConns,
+            e.getValue().getNumIdleConnections());
         connPoolFoundForUser = true;
       }
     }
@@ -265,13 +268,19 @@ public class TestConnectionManager {
 
   @Test
   public void testConfigureConnectionActiveRatio() throws IOException {
-    final int totalConns = 10;
-    int activeConns = 7;
+    // test 1 conn below the threshold and these conns are closed
+    testConnectionCleanup(0.8f, 10, 7, 9);
+
+    // test 2 conn below the threshold and these conns are closed
+    testConnectionCleanup(0.8f, 10, 6, 8);
+  }
 
+  private void testConnectionCleanup(float ratio, int totalConns,
+      int activeConns, int leftConns) throws IOException {
     Configuration tmpConf = new Configuration();
-    // Set dfs.federation.router.connection.min-active-ratio 0.8f
+    // Set dfs.federation.router.connection.min-active-ratio
     tmpConf.setFloat(
-        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, 0.8f);
+        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, ratio);
     ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
     tmpConnManager.start();
 
@@ -284,21 +293,20 @@ public class TestConnectionManager {
         TEST_NN_ADDRESS, NamenodeProtocol.class);
     ConnectionPool pool = poolMap.get(connectionPoolId);
 
-    // Test min active ratio is 0.8f
-    assertEquals(0.8f, pool.getMinActiveRatio(), 0.001f);
+    // Test min active ratio is as set value
+    assertEquals(ratio, pool.getMinActiveRatio(), 0.001f);
 
     pool.getConnection().getClient();
     // Test there is one active connection in pool
     assertEquals(1, pool.getNumActiveConnections());
 
-    // Add other 6 active/9 total connections to pool
+    // Add other active-1 connections / totalConns-1 connections to pool
     addConnectionsToPool(pool, totalConns - 1, activeConns - 1);
 
-    // There are 7 active connections.
-    // The active number is less than totalConns(10) * minActiveRatio(0.8f).
+    // There are activeConn connections.
     // We can cleanup the pool
     tmpConnManager.cleanup(pool);
-    assertEquals(totalConns - 1, pool.getNumConnections());
+    assertEquals(leftConns, pool.getNumConnections());
 
     tmpConnManager.close();
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org