You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by fe...@apache.org on 2022/05/13 06:29:09 UTC

[hadoop] branch trunk updated: HDFS-14750. RBF: Support dynamic handler allocation in routers (#4199)

This is an automated email from the ASF dual-hosted git repository.

ferhui pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 680af27aa66 HDFS-14750. RBF: Support dynamic handler allocation in routers (#4199)
680af27aa66 is described below

commit 680af27aa66765ef172b45486e9a0c6bbb40ec23
Author: Felix Nguyen <23...@users.noreply.github.com>
AuthorDate: Fri May 13 14:28:53 2022 +0800

    HDFS-14750. RBF: Support dynamic handler allocation in routers (#4199)
---
 .../AbstractRouterRpcFairnessPolicyController.java |  47 +++++-
 .../DynamicRouterRpcFairnessPolicyController.java  | 178 ++++++++++++++++++++
 .../NoRouterRpcFairnessPolicyController.java       |  14 ++
 .../RouterRpcFairnessPolicyController.java         |  14 ++
 .../StaticRouterRpcFairnessPolicyController.java   |  21 ++-
 .../federation/metrics/FederationRPCMBean.java     |   6 +
 .../federation/metrics/FederationRPCMetrics.java   |   5 +
 .../server/federation/router/FederationUtil.java   |  18 ++
 .../server/federation/router/RBFConfigKeys.java    |  13 +-
 .../server/federation/router/RouterRpcClient.java  |   4 +-
 .../federation/utils/AdjustableSemaphore.java      |  36 ++++
 .../src/main/resources/hdfs-rbf-default.xml        |  18 ++
 ...stDynamicRouterRpcFairnessPolicyController.java | 181 +++++++++++++++++++++
 .../TestRouterRpcFairnessPolicyController.java     |  35 ++++
 14 files changed, 579 insertions(+), 11 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
index fe498c66b7e..2677c2813b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
@@ -22,10 +22,13 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +43,10 @@ public class AbstractRouterRpcFairnessPolicyController
       LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);
 
   /** Hash table to hold semaphore for each configured name service. */
-  private Map<String, Semaphore> permits;
+  private Map<String, AdjustableSemaphore> permits;
+  private final Map<String, Integer> permitSizes = new HashMap<>();
+  private Map<String, LongAdder> rejectedPermitsPerNs;
+  private Map<String, LongAdder> acceptedPermitsPerNs;
 
   public void init(Configuration conf) {
     this.permits = new HashMap<>();
@@ -72,7 +78,7 @@ public class AbstractRouterRpcFairnessPolicyController
   }
 
   protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
-    this.permits.put(nsId, new Semaphore(maxPermits));
+    this.permits.put(nsId, new AdjustableSemaphore(maxPermits));
   }
 
   protected int getAvailablePermits(String nsId) {
@@ -82,7 +88,7 @@ public class AbstractRouterRpcFairnessPolicyController
   @Override
   public String getAvailableHandlerOnPerNs() {
     JSONObject json = new JSONObject();
-    for (Map.Entry<String, Semaphore> entry : permits.entrySet()) {
+    for (Map.Entry<String, AdjustableSemaphore> entry : permits.entrySet()) {
       try {
         String nsId = entry.getKey();
         int availableHandler = entry.getValue().availablePermits();
@@ -93,4 +99,39 @@ public class AbstractRouterRpcFairnessPolicyController
     }
     return json.toString();
   }
+
+  @Override
+  public String getPermitCapacityPerNs() {
+    JSONObject json = new JSONObject();
+    for (Map.Entry<String, Integer> entry : permitSizes.entrySet()) {
+      try {
+        json.put(entry.getKey(), entry.getValue());
+      } catch (JSONException e) {
+        LOG.warn("Cannot put {} into JSONObject", entry.getKey(), e);
+      }
+    }
+    return json.toString();
+  }
+
+  @Override
+  public void setMetrics(Map<String, LongAdder> rejectedPermits,
+      Map<String, LongAdder> acceptedPermits) {
+    this.rejectedPermitsPerNs = rejectedPermits;
+    this.acceptedPermitsPerNs = acceptedPermits;
+  }
+
+  protected Map<String, AdjustableSemaphore> getPermits() {
+    return permits;
+  }
+
+  public Map<String, LongAdder> getRejectedPermitsPerNs() {
+    return rejectedPermitsPerNs;
+  }
+  public Map<String, LongAdder> getAcceptedPermitsPerNs() {
+    return acceptedPermitsPerNs;
+  }
+
+  protected Map<String, Integer> getPermitSizes() {
+    return permitSizes;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java
new file mode 100644
index 00000000000..899020b62b1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/DynamicRouterRpcFairnessPolicyController.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.utils.AdjustableSemaphore;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+
+/**
+ * Dynamic fairness policy extending {@link StaticRouterRpcFairnessPolicyController}
+ * and fetching handlers from configuration for all available name services.
+ * The handlers count changes according to traffic to namespaces.
+ * Total handlers might NOT strictly add up to the value defined by DFS_ROUTER_HANDLER_COUNT_KEY
+ * but will not exceed initial handler count + number of nameservices.
+ */
+public class DynamicRouterRpcFairnessPolicyController
+    extends StaticRouterRpcFairnessPolicyController {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DynamicRouterRpcFairnessPolicyController.class);
+
+  private static final ScheduledExecutorService SCHEDULED_EXECUTOR = HadoopExecutors
+      .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true)
+          .setNameFormat("DynamicRouterRpcFairnessPolicyControllerPermitsResizer").build());
+  private PermitsResizerService permitsResizerService;
+  private ScheduledFuture<?> refreshTask;
+  private int handlerCount;
+  private int minimumHandlerPerNs;
+
+  /**
+   * Initializes using the same logic as {@link StaticRouterRpcFairnessPolicyController}
+   * and starts a periodic semaphore resizer thread.
+   *
+   * @param conf configuration
+   */
+  public DynamicRouterRpcFairnessPolicyController(Configuration conf) {
+    super(conf);
+    minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
+        DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
+    handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+    long refreshInterval =
+        conf.getTimeDuration(DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY,
+            DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT,
+            TimeUnit.SECONDS);
+    permitsResizerService = new PermitsResizerService();
+    refreshTask = SCHEDULED_EXECUTOR
+        .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
+            TimeUnit.SECONDS);
+  }
+
+  @VisibleForTesting
+  public DynamicRouterRpcFairnessPolicyController(Configuration conf, long refreshInterval) {
+    super(conf);
+    minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
+        DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
+    handlerCount = conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+    permitsResizerService = new PermitsResizerService();
+    refreshTask = SCHEDULED_EXECUTOR
+        .scheduleWithFixedDelay(permitsResizerService, refreshInterval, refreshInterval,
+            TimeUnit.SECONDS);
+  }
+
+  @VisibleForTesting
+  public void refreshPermitsCap() {
+    permitsResizerService.run();
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    if (refreshTask != null) {
+      refreshTask.cancel(true);
+    }
+    if (SCHEDULED_EXECUTOR != null) {
+      SCHEDULED_EXECUTOR.shutdown();
+    }
+  }
+
+  class PermitsResizerService implements Runnable {
+
+    @Override
+    public synchronized void run() {
+      if ((getRejectedPermitsPerNs() == null) || (getAcceptedPermitsPerNs() == null)) {
+        return;
+      }
+      long totalOps = 0;
+      Map<String, Long> nsOps = new HashMap<>();
+      for (Map.Entry<String, AdjustableSemaphore> entry : getPermits().entrySet()) {
+        long ops = (getRejectedPermitsPerNs().containsKey(entry.getKey()) ?
+            getRejectedPermitsPerNs().get(entry.getKey()).longValue() :
+            0) + (getAcceptedPermitsPerNs().containsKey(entry.getKey()) ?
+            getAcceptedPermitsPerNs().get(entry.getKey()).longValue() :
+            0);
+        nsOps.put(entry.getKey(), ops);
+        totalOps += ops;
+      }
+
+      List<String> underMinimumNss = new ArrayList<>();
+      List<String> overMinimumNss = new ArrayList<>();
+      int effectiveOps = 0;
+
+      // First iteration: split namespaces into those underused and those that are not.
+      for (Map.Entry<String, AdjustableSemaphore> entry : getPermits().entrySet()) {
+        String ns = entry.getKey();
+        int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / totalOps * handlerCount);
+
+        if (newPermitCap <= minimumHandlerPerNs) {
+          underMinimumNss.add(ns);
+        } else {
+          overMinimumNss.add(ns);
+          effectiveOps += nsOps.get(ns);
+        }
+      }
+
+      // Second iteration part 1: assign minimum handlers
+      for (String ns: underMinimumNss) {
+        resizeNsHandlerCapacity(ns, minimumHandlerPerNs);
+      }
+      // Second iteration part 2: assign handlers to the rest
+      int leftoverPermits = handlerCount - minimumHandlerPerNs * underMinimumNss.size();
+      for (String ns: overMinimumNss) {
+        int newPermitCap = (int) Math.ceil((float) nsOps.get(ns) / effectiveOps * leftoverPermits);
+        resizeNsHandlerCapacity(ns, newPermitCap);
+      }
+    }
+
+    private void resizeNsHandlerCapacity(String ns, int newPermitCap) {
+      AdjustableSemaphore semaphore = getPermits().get(ns);
+      int oldPermitCap = getPermitSizes().get(ns);
+      if (newPermitCap <= minimumHandlerPerNs) {
+        newPermitCap = minimumHandlerPerNs;
+      }
+      getPermitSizes().put(ns, newPermitCap);
+      if (newPermitCap > oldPermitCap) {
+        semaphore.release(newPermitCap - oldPermitCap);
+      } else if (newPermitCap < oldPermitCap) {
+        semaphore.reducePermits(oldPermitCap - newPermitCap);
+      }
+      LOG.info("Resized handlers for nsId {} from {} to {}", ns, oldPermitCap, newPermitCap);
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java
index 3b85da59e1f..6ce8ac2afd6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdfs.server.federation.fairness;
 
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -51,4 +54,15 @@ public class NoRouterRpcFairnessPolicyController implements
   public String getAvailableHandlerOnPerNs(){
     return "N/A";
   }
+
+  @Override
+  public String getPermitCapacityPerNs() {
+    return "N/A";
+  }
+
+  @Override
+  public void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
+      Map<String, LongAdder> acceptedPermitsPerNs) {
+    // Nothing
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java
index 354383a168f..d6921add562 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.hdfs.server.federation.fairness;
 
+import java.util.Map;
+import java.util.concurrent.atomic.LongAdder;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -67,4 +70,15 @@ public interface RouterRpcFairnessPolicyController {
    * Returns the JSON string of the available handler for each Ns.
    */
   String getAvailableHandlerOnPerNs();
+
+  /**
+   * Returns the JSON string of the max handler count for each ns.
+   */
+  String getPermitCapacityPerNs();
+
+  /**
+   * Attaches permits access metrics to the controller.
+   */
+  void setMetrics(Map<String, LongAdder> rejectedPermitsPerNs,
+      Map<String, LongAdder> acceptedPermitsPerNs);
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
index aa0777fc03d..191ad46d96c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
@@ -27,6 +27,8 @@ import java.util.Set;
 import java.util.HashSet;
 
 import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
@@ -45,6 +47,9 @@ public class StaticRouterRpcFairnessPolicyController extends
   public static final String ERROR_MSG = "Configured handlers "
       + DFS_ROUTER_HANDLER_COUNT_KEY + '='
       + " %d is less than the minimum required handlers %d";
+  public static final String ERROR_NS_MSG =
+      "Configured handlers %s=%d is less than the minimum required handlers %d";
+
 
   public StaticRouterRpcFairnessPolicyController(Configuration conf) {
     init(conf);
@@ -78,6 +83,7 @@ public class StaticRouterRpcFairnessPolicyController extends
         handlerCount -= dedicatedHandlers;
         insertNameServiceWithPermits(nsId, dedicatedHandlers);
         logAssignment(nsId, dedicatedHandlers);
+        getPermitSizes().put(nsId, dedicatedHandlers);
       } else {
         unassignedNS.add(nsId);
       }
@@ -92,6 +98,7 @@ public class StaticRouterRpcFairnessPolicyController extends
       for (String nsId : unassignedNS) {
         insertNameServiceWithPermits(nsId, handlersPerNS);
         logAssignment(nsId, handlersPerNS);
+        getPermitSizes().put(nsId, handlersPerNS);
       }
     }
 
@@ -103,6 +110,7 @@ public class StaticRouterRpcFairnessPolicyController extends
       LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
       insertNameServiceWithPermits(CONCURRENT_NS,
           existingPermits + leftOverHandlers);
+      getPermitSizes().put(CONCURRENT_NS, existingPermits + leftOverHandlers);
     }
     LOG.info("Final permit allocation for concurrent ns: {}",
         getAvailablePermits(CONCURRENT_NS));
@@ -116,15 +124,23 @@ public class StaticRouterRpcFairnessPolicyController extends
   private void validateHandlersCount(Configuration conf, int handlerCount,
                                      Set<String> allConfiguredNS) {
     int totalDedicatedHandlers = 0;
+    int minimumHandlerPerNs = conf.getInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY,
+        DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT);
     for (String nsId : allConfiguredNS) {
       int dedicatedHandlers =
               conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
       if (dedicatedHandlers > 0) {
+        if (dedicatedHandlers < minimumHandlerPerNs) {
+          String msg = String.format(ERROR_NS_MSG, DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId,
+              handlerCount, minimumHandlerPerNs);
+          LOG.error(msg);
+          throw new IllegalArgumentException(msg);
+        }
         // Total handlers should not be less than sum of dedicated handlers.
         totalDedicatedHandlers += dedicatedHandlers;
       } else {
-        // Each NS should have at least one handler assigned.
-        totalDedicatedHandlers++;
+        // Each NS has to have a minimum number of handlers assigned.
+        totalDedicatedHandlers += minimumHandlerPerNs;
       }
     }
     if (totalDedicatedHandlers > handlerCount) {
@@ -134,5 +150,4 @@ public class StaticRouterRpcFairnessPolicyController extends
       throw new IllegalArgumentException(msg);
     }
   }
-
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index 979e7504a87..3499962d73b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -115,6 +115,12 @@ public interface FederationRPCMBean {
    */
   String getAvailableHandlerOnPerNs();
 
+  /**
+   * JSON representation of max handler count per ns.
+   * @return JSON string representation.
+   */
+  String getPermitCapacityPerNs();
+
   /**
    * Get the JSON representation of the async caller thread pool.
    * @return JSON string representation of the async caller thread pool.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index 823bc7b8af2..1ec49b37d33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -247,6 +247,11 @@ public class FederationRPCMetrics implements FederationRPCMBean {
         getRouterRpcFairnessPolicyController().getAvailableHandlerOnPerNs();
   }
 
+  @Override
+  public String getPermitCapacityPerNs() {
+    return rpcServer.getRPCClient().getRouterRpcFairnessPolicyController().getPermitCapacityPerNs();
+  }
+
   @Override
   public String getAsyncCallerPool() {
     return rpcServer.getRPCClient().getAsyncCallerPoolJson();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
index 7ff853946d7..30fae5caf37 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
@@ -29,7 +29,9 @@ import java.net.URLConnection;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -271,6 +273,22 @@ public final class FederationUtil {
     return newInstance(conf, null, null, clazz);
   }
 
+  /**
+   * Creates an instance of an RouterRpcFairnessPolicyController
+   * from the configuration and attaches permits access metrics to the controller.
+   *
+   * @param conf Configuration that defines the fairness controller class.
+   * @param rejectedPermitsPerNs Metrics map ns:rejected permits
+   * @param acceptedPermitsPerNs Metrics map ns:accepted permits
+   * @return Fairness policy controller.
+   */
+  public static RouterRpcFairnessPolicyController newFairnessPolicyController(Configuration conf,
+      Map<String, LongAdder> rejectedPermitsPerNs, Map<String, LongAdder> acceptedPermitsPerNs) {
+    RouterRpcFairnessPolicyController instance = newFairnessPolicyController(conf);
+    instance.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
+    return instance;
+  }
+
   /**
    * Collect all configured nameservices.
    *
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 741e470c6fc..7ba982a9597 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdfs.server.federation.router;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController;
@@ -27,13 +29,11 @@ import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
 import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializerPBImpl;
 import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreZooKeeperImpl;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
-import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl;
-
-import java.util.concurrent.TimeUnit;
 
 /**
  * Config fields for router-based hdfs federation.
@@ -354,6 +354,13 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
       NoRouterRpcFairnessPolicyController.class;
   public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
       FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
+  public static final String DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY =
+      FEDERATION_ROUTER_FAIRNESS_PREFIX + "minimum.handler.count";
+  public static final int DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_DEFAULT = 1;
+  public static final long DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_DEFAULT =
+      600;
+  public static final String DFS_ROUTER_DYNAMIC_FAIRNESS_CONTROLLER_REFRESH_INTERVAL_SECONDS_KEY =
+      FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.dynamic.refresh.interval.seconds";
 
   // HDFS Router Federation Rename.
   public static final String DFS_ROUTER_FEDERATION_RENAME_PREFIX =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 34a2c47c3ef..78f9bbd6cc6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -156,8 +156,8 @@ public class RouterRpcClient {
             HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
     this.connectionManager = new ConnectionManager(clientConf);
     this.connectionManager.start();
-    this.routerRpcFairnessPolicyController =
-        FederationUtil.newFairnessPolicyController(conf);
+    this.routerRpcFairnessPolicyController = FederationUtil
+        .newFairnessPolicyController(conf, rejectedPermitsPerNs, acceptedPermitsPerNs);
 
     int numThreads = conf.getInt(
         RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java
new file mode 100644
index 00000000000..34a0563232f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/utils/AdjustableSemaphore.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.utils;
+
+import java.util.concurrent.Semaphore;
+
+public class AdjustableSemaphore extends Semaphore {
+
+  public AdjustableSemaphore(int permits) {
+    super(permits);
+  }
+
+  public AdjustableSemaphore(int permits, boolean fair) {
+    super(permits, fair);
+  }
+
+  public void reducePermits(int reduction) {
+    super.reducePermits(reduction);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index fcf6a28475f..b35cc569a4c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -706,6 +706,24 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.fairness.minimum.handler.count</name>
+    <value>1</value>
+    <description>
+      Minimum number of handlers assigned per nameservice.
+      If any dedicated handler count is smaller than this number,
+      router initialization will fail.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.fairness.policy.controller.dynamic.refresh.interval.seconds</name>
+    <value>600</value>
+    <description>
+      Interval (in seconds) between each handler count resize by DynamicFairnessPolicyController
+    </description>
+  </property>
+
   <property>
     <name>dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE</name>
     <value></value>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java
new file mode 100644
index 00000000000..057f7a36468
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestDynamicRouterRpcFairnessPolicyController.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+
+/**
+ * Test functionality of {@link DynamicRouterRpcFairnessPolicyController).
+ */
+public class TestDynamicRouterRpcFairnessPolicyController {
+
+  private static String nameServices = "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2, ns3.nn1";
+
+  @Test
+  public void testDynamicControllerSimple() throws InterruptedException, TimeoutException {
+//    verifyDynamicControllerSimple(true);
+    verifyDynamicControllerSimple(false);
+  }
+
+  @Test
+  public void testDynamicControllerAllPermitsAcquired() throws InterruptedException {
+    verifyDynamicControllerAllPermitsAcquired(true);
+    verifyDynamicControllerAllPermitsAcquired(false);
+  }
+
+  private void verifyDynamicControllerSimple(boolean manualRefresh)
+      throws InterruptedException, TimeoutException {
+    // 3 permits each ns
+    DynamicRouterRpcFairnessPolicyController controller;
+    if (manualRefresh) {
+      controller = getFairnessPolicyController(20);
+    } else {
+      controller = getFairnessPolicyController(20, 4);
+    }
+
+    String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS};
+    // Initial permit counts should be 5:5:5
+    verifyRemainingPermitCounts(new int[] {5, 5, 5, 5}, nss, controller);
+
+    // Release all permits
+    for (int i = 0; i < 5; i++) {
+      controller.releasePermit("ns1");
+      controller.releasePermit("ns2");
+      controller.releasePermit("ns3");
+      controller.releasePermit(CONCURRENT_NS);
+    }
+
+    // Inject dummy metrics
+    // Split half half for ns1 and concurrent
+    Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
+    Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
+    injectDummyMetrics(rejectedPermitsPerNs, "ns1", 10);
+    injectDummyMetrics(rejectedPermitsPerNs, "ns2", 0);
+    injectDummyMetrics(rejectedPermitsPerNs, "ns3", 10);
+    injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 10);
+    controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
+
+    // Current permits count should be 6:3:6:6
+    int[] newPermitCounts = new int[] {6, 3, 6, 6};
+
+    if (manualRefresh) {
+      controller.refreshPermitsCap();
+    } else {
+      Thread.sleep(5000);
+    }
+    verifyRemainingPermitCounts(newPermitCounts, nss, controller);
+
+  }
+
+  public void verifyDynamicControllerAllPermitsAcquired(boolean manualRefresh)
+      throws InterruptedException {
+    // 10 permits each ns
+    DynamicRouterRpcFairnessPolicyController controller;
+    if (manualRefresh) {
+      controller = getFairnessPolicyController(40);
+    } else {
+      controller = getFairnessPolicyController(40, 4);
+    }
+
+    String[] nss = new String[] {"ns1", "ns2", "ns3", CONCURRENT_NS};
+    verifyRemainingPermitCounts(new int[] {10, 10, 10, 10}, nss, controller);
+
+    // Inject dummy metrics
+    Map<String, LongAdder> rejectedPermitsPerNs = new HashMap<>();
+    Map<String, LongAdder> acceptedPermitsPerNs = new HashMap<>();
+    injectDummyMetrics(rejectedPermitsPerNs, "ns1", 13);
+    injectDummyMetrics(rejectedPermitsPerNs, "ns2", 13);
+    injectDummyMetrics(rejectedPermitsPerNs, "ns3", 13);
+    injectDummyMetrics(rejectedPermitsPerNs, CONCURRENT_NS, 1);
+    // New permit capacity will be 13:13:13:3
+    controller.setMetrics(rejectedPermitsPerNs, acceptedPermitsPerNs);
+    if (manualRefresh) {
+      controller.refreshPermitsCap();
+    } else {
+      Thread.sleep(5000);
+    }
+    Assert.assertEquals("{\"concurrent\":-7,\"ns2\":3,\"ns1\":3,\"ns3\":3}",
+        controller.getAvailableHandlerOnPerNs());
+
+    // Can acquire 3 more permits for ns1, ns2, ns3
+    verifyRemainingPermitCounts(new int[] {3, 3, 3, 0}, nss, controller);
+    // Need to release at least 8 permits for concurrent before it has any free permits
+    Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
+    for (int i = 0; i < 7; i++) {
+      controller.releasePermit(CONCURRENT_NS);
+    }
+    Assert.assertFalse(controller.acquirePermit(CONCURRENT_NS));
+    controller.releasePermit(CONCURRENT_NS);
+    Assert.assertTrue(controller.acquirePermit(CONCURRENT_NS));
+  }
+
+  private void verifyRemainingPermitCounts(int[] remainingPermitCounts, String[] nss,
+      RouterRpcFairnessPolicyController controller) {
+    assert remainingPermitCounts.length == nss.length;
+    for (int i = 0; i < remainingPermitCounts.length; i++) {
+      verifyRemainingPermitCount(remainingPermitCounts[i], nss[i], controller);
+    }
+  }
+
+  private void verifyRemainingPermitCount(int remainingPermitCount, String nameservice,
+      RouterRpcFairnessPolicyController controller) {
+    for (int i = 0; i < remainingPermitCount; i++) {
+      Assert.assertTrue(controller.acquirePermit(nameservice));
+    }
+    Assert.assertFalse(controller.acquirePermit(nameservice));
+  }
+
+  private void injectDummyMetrics(Map<String, LongAdder> metrics, String ns, long value) {
+    metrics.computeIfAbsent(ns, k -> new LongAdder()).add(value);
+  }
+
+  private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers,
+      long refreshInterval) {
+    return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), refreshInterval);
+  }
+
+  private DynamicRouterRpcFairnessPolicyController getFairnessPolicyController(int handlers) {
+    return new DynamicRouterRpcFairnessPolicyController(createConf(handlers, 3), Long.MAX_VALUE);
+  }
+
+  private Configuration createConf(int handlers, int minHandlersPerNs) {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers);
+    conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices);
+    conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, minHandlersPerNs);
+    conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+        DynamicRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class);
+    return conf;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
index 8307f666b5d..261f070df09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
@@ -27,6 +27,7 @@ import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
 import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
@@ -102,6 +103,14 @@ public class TestRouterRpcFairnessPolicyController {
     verifyInstantiationError(conf, 1, 3);
   }
 
+  @Test
+  public void testAllocationErrorTooFewDedicatedHandlers() {
+    Configuration conf = createConf(9);
+    conf.setInt(DFS_ROUTER_FAIR_MINIMUM_HANDLER_COUNT_KEY, 3);
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + CONCURRENT_NS, 1);
+    verifyInstantiationError(conf, CONCURRENT_NS,  9, 3);
+  }
+
   @Test
   public void testGetAvailableHandlerOnPerNs() {
     RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
@@ -113,6 +122,18 @@ public class TestRouterRpcFairnessPolicyController {
         routerRpcFairnessPolicyController.getAvailableHandlerOnPerNs());
   }
 
+  @Test
+  public void testGetPermitCapacityPerNs() {
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
+        = getFairnessPolicyController(30);
+    assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}",
+        routerRpcFairnessPolicyController.getPermitCapacityPerNs());
+    routerRpcFairnessPolicyController.acquirePermit("ns1");
+    routerRpcFairnessPolicyController.acquirePermit("ns2");
+    assertEquals("{\"concurrent\":10,\"ns2\":10,\"ns1\":10}",
+        routerRpcFairnessPolicyController.getPermitCapacityPerNs());
+  }
+
   @Test
   public void testGetAvailableHandlerOnPerNsForNoFairness() {
     Configuration conf = new Configuration();
@@ -170,6 +191,20 @@ public class TestRouterRpcFairnessPolicyController {
         logs.getOutput().contains(errorMsg));
   }
 
+  private void verifyInstantiationError(Configuration conf, String ns, int handlerCount,
+      int minimumHandler) {
+    GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(
+        LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class));
+    try {
+      FederationUtil.newFairnessPolicyController(conf);
+    } catch (IllegalArgumentException e) {
+      // Ignore the exception as it is expected here.
+    }
+    String errorMsg = String.format(StaticRouterRpcFairnessPolicyController.ERROR_NS_MSG,
+        DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + ns, handlerCount, minimumHandler);
+    assertTrue("Should contain error message: " + errorMsg, logs.getOutput().contains(errorMsg));
+  }
+
   private RouterRpcFairnessPolicyController getFairnessPolicyController(
       int handlers) {
     return FederationUtil.newFairnessPolicyController(createConf(handlers));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org