You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/03/03 18:36:52 UTC

[hadoop] 13/45: HDFS-14114. RBF: MIN_ACTIVE_RATIO should be configurable. Contributed by Fei Hui.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a78ba709d175b87f6dd39641260b057893194e3d
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Tue Dec 4 19:58:38 2018 +0800

    HDFS-14114. RBF: MIN_ACTIVE_RATIO should be configurable. Contributed by Fei Hui.
---
 .../federation/router/ConnectionManager.java       | 20 +++++----
 .../server/federation/router/ConnectionPool.java   | 14 +++++-
 .../server/federation/router/RBFConfigKeys.java    |  5 +++
 .../src/main/resources/hdfs-rbf-default.xml        |  8 ++++
 .../federation/router/TestConnectionManager.java   | 51 +++++++++++++++++++---
 5 files changed, 83 insertions(+), 15 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
index fa2bf94..74bbbb5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java
@@ -49,10 +49,6 @@ public class ConnectionManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(ConnectionManager.class);
 
-  /** Minimum amount of active connections: 50%. */
-  protected static final float MIN_ACTIVE_RATIO = 0.5f;
-
-
   /** Configuration for the connection manager, pool and sockets. */
   private final Configuration conf;
 
@@ -60,6 +56,8 @@ public class ConnectionManager {
   private final int minSize = 1;
   /** Max number of connections per user + nn. */
   private final int maxSize;
+  /** Min ratio of active connections per user + nn. */
+  private final float minActiveRatio;
 
   /** How often we close a pool for a particular user + nn. */
   private final long poolCleanupPeriodMs;
@@ -96,10 +94,13 @@ public class ConnectionManager {
   public ConnectionManager(Configuration config) {
     this.conf = config;
 
-    // Configure minimum and maximum connection pools
+    // Configure minimum, maximum and active connection pools
     this.maxSize = this.conf.getInt(
         RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE,
         RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT);
+    this.minActiveRatio = this.conf.getFloat(
+        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO,
+        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO_DEFAULT);
 
     // Map with the connections indexed by UGI and Namenode
     this.pools = new HashMap<>();
@@ -203,7 +204,8 @@ public class ConnectionManager {
         pool = this.pools.get(connectionId);
         if (pool == null) {
           pool = new ConnectionPool(
-              this.conf, nnAddress, ugi, this.minSize, this.maxSize, protocol);
+              this.conf, nnAddress, ugi, this.minSize, this.maxSize,
+              this.minActiveRatio, protocol);
           this.pools.put(connectionId, pool);
         }
       } finally {
@@ -326,8 +328,9 @@ public class ConnectionManager {
       long timeSinceLastActive = Time.now() - pool.getLastActiveTime();
       int total = pool.getNumConnections();
       int active = pool.getNumActiveConnections();
+      float poolMinActiveRatio = pool.getMinActiveRatio();
       if (timeSinceLastActive > connectionCleanupPeriodMs ||
-          active < MIN_ACTIVE_RATIO * total) {
+          active < poolMinActiveRatio * total) {
         // Remove and close 1 connection
         List<ConnectionContext> conns = pool.removeConnections(1);
         for (ConnectionContext conn : conns) {
@@ -412,8 +415,9 @@ public class ConnectionManager {
           try {
             int total = pool.getNumConnections();
             int active = pool.getNumActiveConnections();
+            float poolMinActiveRatio = pool.getMinActiveRatio();
             if (pool.getNumConnections() < pool.getMaxSize() &&
-                active >= MIN_ACTIVE_RATIO * total) {
+                active >= poolMinActiveRatio * total) {
               ConnectionContext conn = pool.newConnection();
               pool.addConnection(conn);
             } else {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
index fab3b81..f868521 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java
@@ -91,6 +91,8 @@ public class ConnectionPool {
   private final int minSize;
   /** Max number of connections per user. */
   private final int maxSize;
+  /** Min ratio of active connections per user. */
+  private final float minActiveRatio;
 
   /** The last time a connection was active. */
   private volatile long lastActiveTime = 0;
@@ -98,7 +100,7 @@ public class ConnectionPool {
 
   protected ConnectionPool(Configuration config, String address,
       UserGroupInformation user, int minPoolSize, int maxPoolSize,
-      Class<?> proto) throws IOException {
+      float minActiveRatio, Class<?> proto) throws IOException {
 
     this.conf = config;
 
@@ -112,6 +114,7 @@ public class ConnectionPool {
     // Set configuration parameters for the pool
     this.minSize = minPoolSize;
     this.maxSize = maxPoolSize;
+    this.minActiveRatio = minActiveRatio;
 
     // Add minimum connections to the pool
     for (int i=0; i<this.minSize; i++) {
@@ -141,6 +144,15 @@ public class ConnectionPool {
   }
 
   /**
+   * Get the minimum ratio of active connections in this pool.
+   *
+   * @return Minimum ratio of active connections.
+   */
+  protected float getMinActiveRatio() {
+    return this.minActiveRatio;
+  }
+
+  /**
    * Get the connection pool identifier.
    *
    * @return Connection pool identifier.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 10018fe..0070de7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -102,6 +102,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
       FEDERATION_ROUTER_PREFIX + "connection.creator.queue-size";
   public static final int
       DFS_ROUTER_NAMENODE_CONNECTION_CREATOR_QUEUE_SIZE_DEFAULT = 100;
+  public static final String
+      DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO =
+      FEDERATION_ROUTER_PREFIX + "connection.min-active-ratio";
+  public static final float
+      DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO_DEFAULT = 0.5f;
   public static final String DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE =
       FEDERATION_ROUTER_PREFIX + "connection.pool-size";
   public static final int DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE_DEFAULT =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 09050bb..afb3c32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -118,6 +118,14 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.connection.min-active-ratio</name>
+    <value>0.5f</value>
+    <description>
+      Minimum active ratio of connections from the router to namenodes.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.connection.clean.ms</name>
     <value>10000</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
index 765f6c8..6c1e448 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java
@@ -80,14 +80,14 @@ public class TestConnectionManager {
     Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();
 
     ConnectionPool pool1 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
     addConnectionsToPool(pool1, 9, 4);
     poolMap.put(
         new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
         pool1);
 
     ConnectionPool pool2 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, ClientProtocol.class);
+        conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class);
     addConnectionsToPool(pool2, 10, 10);
     poolMap.put(
         new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class),
@@ -110,7 +110,7 @@ public class TestConnectionManager {
 
     // Make sure the number of connections doesn't go below minSize
     ConnectionPool pool3 = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, ClientProtocol.class);
+        conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class);
     addConnectionsToPool(pool3, 8, 0);
     poolMap.put(
         new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class),
@@ -171,7 +171,7 @@ public class TestConnectionManager {
     int activeConns = 5;
 
     ConnectionPool pool = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, ClientProtocol.class);
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
     addConnectionsToPool(pool, totalConns, activeConns);
     poolMap.put(
         new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
@@ -196,7 +196,7 @@ public class TestConnectionManager {
   @Test
   public void testValidClientIndex() throws Exception {
     ConnectionPool pool = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, ClientProtocol.class);
+        conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class);
     for(int i = -3; i <= 3; i++) {
       pool.getClientIndex().set(i);
       ConnectionContext conn = pool.getConnection();
@@ -212,7 +212,7 @@ public class TestConnectionManager {
     int activeConns = 5;
 
     ConnectionPool pool = new ConnectionPool(
-        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, NamenodeProtocol.class);
+        conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class);
     addConnectionsToPool(pool, totalConns, activeConns);
     poolMap.put(
         new ConnectionPoolId(
@@ -262,4 +262,43 @@ public class TestConnectionManager {
     }
   }
 
+  @Test
+  public void testConfigureConnectionActiveRatio() throws IOException {
+    final int totalConns = 10;
+    int activeConns = 7;
+
+    Configuration tmpConf = new Configuration();
+    // Set dfs.federation.router.connection.min-active-ratio 0.8f
+    tmpConf.setFloat(
+        RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_MIN_ACTIVE_RATIO, 0.8f);
+    ConnectionManager tmpConnManager = new ConnectionManager(tmpConf);
+    tmpConnManager.start();
+
+    // Create one new connection pool
+    tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS,
+        NamenodeProtocol.class);
+
+    Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools();
+    ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1,
+        TEST_NN_ADDRESS, NamenodeProtocol.class);
+    ConnectionPool pool = poolMap.get(connectionPoolId);
+
+    // Test min active ratio is 0.8f
+    assertEquals(0.8f, pool.getMinActiveRatio(), 0.001f);
+
+    pool.getConnection().getClient();
+    // Test there is one active connection in pool
+    assertEquals(1, pool.getNumActiveConnections());
+
+    // Add other 6 active/9 total connections to pool
+    addConnectionsToPool(pool, totalConns - 1, activeConns - 1);
+
+    // There are 7 active connections.
+    // The active number is less than totalConns(10) * minActiveRatio(0.8f).
+    // We can cleanup the pool
+    tmpConnManager.cleanup(pool);
+    assertEquals(totalConns - 1, pool.getNumConnections());
+
+    tmpConnManager.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org