You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2020/12/04 17:31:31 UTC

[hadoop] branch trunk updated: HDFS-14090. RBF: Improved isolation for downstream name nodes. {Static}. Contributed by Fengnan Li and CR Hota.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7dda804  HDFS-14090. RBF: Improved isolation for downstream name nodes. {Static}. Contributed by Fengnan Li and CR Hota.
7dda804 is described below

commit 7dda804a1a7e1a40b782f706582e98b41039eace
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Fri Dec 4 22:55:48 2020 +0530

    HDFS-14090. RBF: Improved isolation for downstream name nodes. {Static}. Contributed by Fengnan Li and CR Hota.
---
 .../AbstractRouterRpcFairnessPolicyController.java |  78 ++++++++
 .../NoRouterRpcFairnessPolicyController.java       |  49 +++++
 .../fairness/RouterRpcFairnessConstants.java       |  28 +++
 .../RouterRpcFairnessPolicyController.java         |  65 +++++++
 .../StaticRouterRpcFairnessPolicyController.java   | 126 ++++++++++++
 .../server/federation/fairness/package-info.java   |  28 +++
 .../federation/metrics/FederationRPCMBean.java     |   6 +
 .../federation/metrics/FederationRPCMetrics.java   |  12 ++
 .../server/federation/router/FederationUtil.java   |  53 ++++++
 .../server/federation/router/RBFConfigKeys.java    |  14 ++
 .../server/federation/router/RouterRpcClient.java  |  90 ++++++++-
 .../src/main/resources/hdfs-rbf-default.xml        |  25 +++
 .../fairness/TestRouterHandlersFairness.java       | 211 +++++++++++++++++++++
 .../TestRouterRpcFairnessPolicyController.java     | 160 ++++++++++++++++
 .../federation/router/TestRBFConfigFields.java     |   2 +
 15 files changed, 940 insertions(+), 7 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..629f21e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base fairness policy that implements @RouterRpcFairnessPolicyController.
+ * Internally a map of nameservice to Semaphore is used to control permits.
+ */
+public class AbstractRouterRpcFairnessPolicyController
+    implements RouterRpcFairnessPolicyController {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);
+
+  /** Hash table to hold semaphore for each configured name service. */
+  private Map<String, Semaphore> permits;
+
+  public void init(Configuration conf) {
+    this.permits = new HashMap<>();
+  }
+
+  @Override
+  public boolean acquirePermit(String nsId) {
+    try {
+      LOG.debug("Taking lock for nameservice {}", nsId);
+      return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.debug("Cannot get a permit for nameservice {}", nsId);
+    }
+    return false;
+  }
+
+  @Override
+  public void releasePermit(String nsId) {
+    this.permits.get(nsId).release();
+  }
+
+  @Override
+  public void shutdown() {
+    // drain all semaphores
+    for (Semaphore sema: this.permits.values()) {
+      sema.drainPermits();
+    }
+  }
+
+  protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
+    this.permits.put(nsId, new Semaphore(maxPermits));
+  }
+
+  protected int getAvailablePermits(String nsId) {
+    return this.permits.get(nsId).availablePermits();
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..b6a7df4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A pass through fairness policy that implements
+ * {@link RouterRpcFairnessPolicyController} and allows any number
+ * of handlers to connect to any specific downstream name service.
+ */
+public class NoRouterRpcFairnessPolicyController implements
+    RouterRpcFairnessPolicyController {
+
+  public NoRouterRpcFairnessPolicyController(Configuration conf) {
+      // Dummy constructor.
+  }
+
+  @Override
+  public boolean acquirePermit(String nsId) {
+    return true;
+  }
+
+  @Override
+  public void releasePermit(String nsId) {
+    // Dummy, pass through.
+  }
+
+  @Override
+  public void shutdown() {
+    // Nothing for now.
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessConstants.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessConstants.java
new file mode 100644
index 0000000..5e84317
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessConstants.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+public class RouterRpcFairnessConstants {
+  /** Name service keyword to identify fan-out calls. */
+  public static final String CONCURRENT_NS = "concurrent";
+
+  /* Hidden constructor */
+  protected RouterRpcFairnessConstants() {
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..80e7d21
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Interface to define handlers assignment for specific name services.
+ * This is needed to allow handlers provide a certain QoS for all name services
+ * configured. Implementations can choose to define any algorithm which
+ * would help maintain QoS. This is different when compared to FairCallQueue
+ * semantics as fairness has a separate context in router based federation.
+ * An implementation for example, could allocate a dedicated set of handlers
+ * per name service and allow handlers to continue making downstream name
+ * node calls if permissions are available, another implementation could use
+ * preemption semantics and dynamically increase or decrease handlers
+ * assigned per name service.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface RouterRpcFairnessPolicyController {
+  /**
+   * Request permission for a specific name service to continue the call and
+   * connect to downstream name node. Controllers based on policies defined
+   * and allocations done at start-up through assignHandlersToNameservices,
+   * may provide a permission or reject the request by throwing exception.
+   *
+   * @param nsId NS id for which a permission to continue is requested.
+   * @return true or false based on whether permit is given.
+   */
+  boolean acquirePermit(String nsId);
+
+  /**
+   * Handler threads are expected to invoke this method that signals
+   * controller to release the resources allocated to the thread for the
+   * particular name service. This would mean permissions getting available
+   * for other handlers to request for this specific name service.
+   *
+   * @param nsId Name service id for which permission release request is made.
+   */
+  void releasePermit(String nsId);
+
+  /**
+   * Shutdown steps to stop accepting new permission requests and clean-up.
+   */
+  void shutdown();
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..49a9075
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
+
+/**
+ * Static fairness policy extending @AbstractRouterRpcFairnessPolicyController
+ * and fetching handlers from configuration for all available name services.
+ * The handlers count will not change for this controller.
+ */
+public class StaticRouterRpcFairnessPolicyController extends
+    AbstractRouterRpcFairnessPolicyController {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class);
+
+  public StaticRouterRpcFairnessPolicyController(Configuration conf) {
+    init(conf);
+  }
+
+  public void init(Configuration conf)
+      throws IllegalArgumentException {
+    super.init(conf);
+    // Total handlers configured to process all incoming Rpc.
+    int handlerCount = conf.getInt(
+        DFS_ROUTER_HANDLER_COUNT_KEY,
+        DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+
+    LOG.info("Handlers available for fairness assignment {} ", handlerCount);
+
+    // Get all name services configured
+    Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf);
+
+    // Set to hold name services that are not
+    // configured with dedicated handlers.
+    Set<String> unassignedNS = new HashSet<>();
+
+    // Insert the concurrent nameservice into the set to process together
+    allConfiguredNS.add(CONCURRENT_NS);
+    for (String nsId : allConfiguredNS) {
+      int dedicatedHandlers =
+          conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
+      LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+      if (dedicatedHandlers > 0) {
+        handlerCount -= dedicatedHandlers;
+        // Total handlers should not be less than sum of dedicated
+        // handlers.
+        validateCount(nsId, handlerCount, 0);
+        insertNameServiceWithPermits(nsId, dedicatedHandlers);
+        logAssignment(nsId, dedicatedHandlers);
+      } else {
+        unassignedNS.add(nsId);
+      }
+    }
+
+    // Assign remaining handlers equally to remaining name services and
+    // general pool if applicable.
+    if (!unassignedNS.isEmpty()) {
+      LOG.info("Unassigned ns {}", unassignedNS.toString());
+      int handlersPerNS = handlerCount / unassignedNS.size();
+      LOG.info("Handlers available per ns {}", handlersPerNS);
+      for (String nsId : unassignedNS) {
+        // Each NS should have at least one handler assigned.
+        validateCount(nsId, handlersPerNS, 1);
+        insertNameServiceWithPermits(nsId, handlersPerNS);
+        logAssignment(nsId, handlersPerNS);
+      }
+    }
+
+    // Assign remaining handlers if any to fan out calls.
+    int leftOverHandlers = handlerCount % unassignedNS.size();
+    int existingPermits = getAvailablePermits(CONCURRENT_NS);
+    if (leftOverHandlers > 0) {
+      LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
+      insertNameServiceWithPermits(CONCURRENT_NS,
+          existingPermits + leftOverHandlers);
+    }
+    LOG.info("Final permit allocation for concurrent ns: {}",
+        getAvailablePermits(CONCURRENT_NS));
+  }
+
+  private static void logAssignment(String nsId, int count) {
+    LOG.info("Assigned {} handlers to nsId {} ",
+        count, nsId);
+  }
+
+  private static void validateCount(String nsId, int handlers, int min) throws
+      IllegalArgumentException {
+    if (handlers < min) {
+      String msg =
+          "Available handlers " + handlers +
+          " lower than min " + min +
+          " for nsId " + nsId;
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/package-info.java
new file mode 100644
index 0000000..6b10326
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Includes router handlers fairness manager and policy implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index f57e310..3cde5e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -99,4 +99,10 @@ public interface FederationRPCMBean {
    * @return JSON string representation of the async caller thread pool.
    */
   String getAsyncCallerPool();
+
+  /**
+   * Get the number of operations rejected due to lack of permits.
+   * @return Number of operations rejected due to lack of permits.
+   */
+  long getProxyOpPermitRejected();
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index dd92725..887d50b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -72,6 +72,9 @@ public class FederationRPCMetrics implements FederationRPCMBean {
   @Metric("Failed requests due to safe mode")
   private MutableCounterLong routerFailureSafemode;
 
+  @Metric("Number of operations to hit permit limits")
+  private MutableCounterLong proxyOpPermitRejected;
+
   public FederationRPCMetrics(Configuration conf, RouterRpcServer rpcServer) {
     this.rpcServer = rpcServer;
 
@@ -264,4 +267,13 @@ public class FederationRPCMetrics implements FederationRPCMBean {
   public long getProcessingOps() {
     return processingOp.value();
   }
+
+  public void incrProxyOpPermitRejected() {
+    proxyOpPermitRejected.incr();
+  }
+
+  @Override
+  public long getProxyOpPermitRejected() {
+    return proxyOpPermitRejected.value();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
index 9af9f9a..7ff8539 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.federation.router;
 
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -24,12 +26,16 @@ import java.io.InputStreamReader;
 import java.lang.reflect.Constructor;
 import java.net.URL;
 import java.net.URLConnection;
+import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@@ -248,4 +254,51 @@ public final class FederationUtil {
         .storagePolicy(dirStatus.getStoragePolicy())
         .symlink(dirStatus.getSymlinkInBytes()).flags(flags).build();
   }
+
+  /**
+   * Creates an instance of an RouterRpcFairnessPolicyController
+   * from the configuration.
+   *
+   * @param conf Configuration that defines the fairness controller class.
+   * @return Fairness policy controller.
+   */
+  public static RouterRpcFairnessPolicyController newFairnessPolicyController(
+      Configuration conf) {
+    Class<? extends RouterRpcFairnessPolicyController> clazz = conf.getClass(
+        RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+        RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS_DEFAULT,
+        RouterRpcFairnessPolicyController.class);
+    return newInstance(conf, null, null, clazz);
+  }
+
+  /**
+   * Collect all configured nameservices.
+   *
+   * @param conf
+   * @return Set of name services in config
+   * @throws IllegalArgumentException
+   */
+  public static Set<String> getAllConfiguredNS(Configuration conf)
+      throws IllegalArgumentException {
+    // Get all name services configured
+    Collection<String> namenodes = conf.getTrimmedStringCollection(
+        DFS_ROUTER_MONITOR_NAMENODE);
+
+    Set<String> nameservices = new HashSet();
+    for (String namenode : namenodes) {
+      String[] namenodeSplit = namenode.split("\\.");
+      String nsId;
+      if (namenodeSplit.length == 2) {
+        nsId = namenodeSplit[0];
+      } else if (namenodeSplit.length == 1) {
+        nsId = namenode;
+      } else {
+        String errorMsg = "Wrong name service specified : " + namenode;
+        throw new IllegalArgumentException(
+            errorMsg);
+      }
+      nameservices.add(nsId);
+    }
+    return nameservices;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index ba3659e..8fd2e28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.federation.router;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
 import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -334,4 +336,16 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
   public static final Class<? extends AbstractDelegationTokenSecretManager>
       DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS_DEFAULT =
       ZKDelegationTokenSecretManagerImpl.class;
+
+  // HDFS Router fairness
+  public static final String FEDERATION_ROUTER_FAIRNESS_PREFIX =
+      FEDERATION_ROUTER_PREFIX + "fairness.";
+  public static final String
+      DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS =
+      FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.class";
+  public static final Class<? extends RouterRpcFairnessPolicyController>
+      DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS_DEFAULT =
+      NoRouterRpcFairnessPolicyController.class;
+  public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
+      FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 8ae3731..87f2ed7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_C
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
 
 import java.io.EOFException;
 import java.io.FileNotFoundException;
@@ -61,6 +62,8 @@ import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.federation.fairness.AbstractRouterRpcFairnessPolicyController;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
@@ -129,6 +132,9 @@ public class RouterRpcClient {
 
   private static final String CLIENT_IP_STR = "clientIp";
 
+  /** Fairness manager to control handlers assigned per NS. */
+  private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
+
   /**
    * Create a router RPC client to manage remote procedure calls to NNs.
    *
@@ -149,6 +155,8 @@ public class RouterRpcClient {
             HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
     this.connectionManager = new ConnectionManager(clientConf);
     this.connectionManager.start();
+    this.routerRpcFairnessPolicyController =
+        FederationUtil.newFairnessPolicyController(conf);
 
     int numThreads = conf.getInt(
         RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
@@ -229,6 +237,9 @@ public class RouterRpcClient {
     if (this.executorService != null) {
       this.executorService.shutdownNow();
     }
+    if (this.routerRpcFairnessPolicyController != null) {
+      this.routerRpcFairnessPolicyController.shutdown();
+    }
   }
 
   /**
@@ -770,13 +781,18 @@ public class RouterRpcClient {
   public Object invokeSingle(final String nsId, RemoteMethod method)
       throws IOException {
     UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
-    List<? extends FederationNamenodeContext> nns =
-        getNamenodesForNameservice(nsId);
-    RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
-    Class<?> proto = method.getProtocol();
-    Method m = method.getMethod();
-    Object[] params = method.getParams(loc);
-    return invokeMethod(ugi, nns, proto, m, params);
+    acquirePermit(nsId, ugi, method);
+    try {
+      List<? extends FederationNamenodeContext> nns =
+          getNamenodesForNameservice(nsId);
+      RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
+      Class<?> proto = method.getProtocol();
+      Method m = method.getMethod();
+      Object[] params = method.getParams(loc);
+      return invokeMethod(ugi, nns, proto, m, params);
+    } finally {
+      releasePermit(nsId, ugi, method);
+    }
   }
 
   /**
@@ -933,6 +949,7 @@ public class RouterRpcClient {
     // Invoke in priority order
     for (final RemoteLocationContext loc : locations) {
       String ns = loc.getNameserviceId();
+      acquirePermit(ns, ugi, remoteMethod);
       List<? extends FederationNamenodeContext> namenodes =
           getNamenodesForNameservice(ns);
       try {
@@ -966,6 +983,8 @@ public class RouterRpcClient {
         IOException ioe = new IOException(
             "Unexpected exception proxying API " + e.getMessage(), e);
         thrownExceptions.add(ioe);
+      } finally {
+        releasePermit(ns, ugi, remoteMethod);
       }
     }
 
@@ -1298,6 +1317,7 @@ public class RouterRpcClient {
       // Shortcut, just one call
       T location = locations.iterator().next();
       String ns = location.getNameserviceId();
+      acquirePermit(ns, ugi, method);
       final List<? extends FederationNamenodeContext> namenodes =
           getNamenodesForNameservice(ns);
       try {
@@ -1309,6 +1329,8 @@ public class RouterRpcClient {
       } catch (IOException ioe) {
         // Localize the exception
         throw processException(ioe, location);
+      } finally {
+        releasePermit(ns, ugi, method);
       }
     }
 
@@ -1355,6 +1377,7 @@ public class RouterRpcClient {
       rpcMonitor.proxyOp();
     }
 
+    acquirePermit(CONCURRENT_NS, ugi, method);
     try {
       List<Future<Object>> futures = null;
       if (timeOutMs > 0) {
@@ -1411,6 +1434,8 @@ public class RouterRpcClient {
       LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
       throw new IOException(
           "Unexpected error while invoking API " + ex.getMessage(), ex);
+    } finally {
+      releasePermit(CONCURRENT_NS, ugi, method);
     }
   }
 
@@ -1484,4 +1509,55 @@ public class RouterRpcClient {
     FederationNamenodeContext namenode = namenodes.get(0);
     return namenode.getNameserviceId();
   }
+
+  /**
+   * Acquire permit to continue processing the request for specific nsId.
+   *
+   * @param nsId Identifier of the block pool.
+   * @param ugi UserGroupIdentifier associated with the user.
+   * @param m Remote method that needs to be invoked.
+   * @throws IOException If permit could not be acquired for the nsId.
+   */
+  private void acquirePermit(
+      final String nsId, final UserGroupInformation ugi, final RemoteMethod m)
+      throws IOException {
+    if (routerRpcFairnessPolicyController != null
+        && !routerRpcFairnessPolicyController.acquirePermit(nsId)) {
+      // Throw StandByException,
+      // Clients could fail over and try another router.
+      if (rpcMonitor != null) {
+        rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected();
+      }
+      LOG.debug("Permit denied for ugi: {} for method: {}",
+          ugi, m.getMethodName());
+      String msg =
+          "Router " + router.getRouterId() +
+              " is overloaded for NS: " + nsId;
+      throw new StandbyException(msg);
+    }
+  }
+
+  /**
+   * Release permit for specific nsId after processing against downstream
+   * nsId is completed.
+   *
+   * @param nsId Identifier of the block pool.
+   * @param ugi UserGroupIdentifier associated with the user.
+   * @param m Remote method that needs to be invoked.
+   */
+  private void releasePermit(
+      final String nsId, final UserGroupInformation ugi, final RemoteMethod m) {
+    if (routerRpcFairnessPolicyController != null) {
+      routerRpcFairnessPolicyController.releasePermit(nsId);
+      LOG.trace("Permit released for ugi: {} for method: {}", ugi,
+          m.getMethodName());
+    }
+  }
+
+  @VisibleForTesting
+  public AbstractRouterRpcFairnessPolicyController
+      getRouterRpcFairnessPolicyController() {
+    return (AbstractRouterRpcFairnessPolicyController
+          )routerRpcFairnessPolicyController;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index aab90e4..8c17185 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -677,4 +677,29 @@
     </description>
   </property>
 
+  <property>
+    <name>dfs.federation.router.fairness.policy.controller.class</name>
+    <value>org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController</value>
+    <description>
+      No fairness policy handler by default, for fairness
+      StaticFairnessPolicyController should be configured.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE</name>
+    <value></value>
+    <description>
+      Dedicated handler count for nameservice EXAMPLENAMESERVICE. The handler
+      (configed by dfs.federation.router.handler.count)resource is controlled
+      internally by Semaphore permits. Two requirements have to be satisfied.
+      1) all downstream nameservices need this config otherwise no permit will
+      be given thus not proxy will happen. 2) if a special *concurrent*
+      nameservice is specified, the sum of all configured values is smaller or
+      equal to the total number of router handlers; if the special *concurrent*
+      is not specified, the sum of all configured values must be strictly
+      smaller than the router handlers thus the left will be allocated to the
+      concurrent calls.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java
new file mode 100644
index 0000000..c3fc324
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the Router handlers fairness control rejects
+ * requests when the handlers are overloaded.
+ */
+public class TestRouterHandlersFairness {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterHandlersFairness.class);
+
+  private StateStoreDFSCluster cluster;
+
+  @After
+  public void cleanup() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  private void setupCluster(boolean fairnessEnable, boolean ha)
+      throws Exception {
+    // Build and start a federated cluster
+    cluster = new StateStoreDFSCluster(ha, 2);
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .rpc()
+        .build();
+
+    // Fairness control
+    if (fairnessEnable) {
+      routerConf.setClass(
+          RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+          StaticRouterRpcFairnessPolicyController.class,
+          RouterRpcFairnessPolicyController.class);
+    }
+
+    // With two name services configured, each nameservice has 1 permit and
+    // fan-out calls have 1 permit.
+    routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 3);
+
+    // Datanodes not needed for this test.
+    cluster.setNumDatanodesPerNameservice(0);
+
+    cluster.addRouterOverrides(routerConf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+  }
+
+  @Test
+  public void testFairnessControlOff() throws Exception {
+    setupCluster(false, false);
+    startLoadTest(false);
+  }
+
+  @Test
+  public void testFairnessControlOn() throws Exception {
+    setupCluster(true, false);
+    startLoadTest(true);
+  }
+
+  /**
+   * Start a generic load test as a client against a cluster which has either
+   * fairness configured or not configured. Test will spawn a set of 100
+   * threads to simulate concurrent request to test routers. If fairness is
+   * enabled, the test will successfully report the failure of some threads
+   * to continue with StandbyException. If fairness is not configured, all
+   * threads of the same test should successfully complete all the rpcs.
+   *
+   * @param fairness Flag to indicate if fairness management is on/off.
+   * @throws Exception Throws exception.
+   */
+  private void startLoadTest(boolean fairness)
+      throws Exception {
+
+    // Concurrent requests
+    startLoadTest(true, fairness);
+
+    // Sequential requests
+    startLoadTest(false, fairness);
+  }
+
+  private void startLoadTest(final boolean isConcurrent, final boolean fairness)
+      throws Exception {
+
+    RouterContext routerContext = cluster.getRandomRouter();
+    if (fairness) {
+      if (isConcurrent) {
+        LOG.info("Taking fanout lock first");
+        // take the lock for concurrent NS to block fanout calls
+        assertTrue(routerContext.getRouter().getRpcServer()
+            .getRPCClient().getRouterRpcFairnessPolicyController()
+            .acquirePermit(RouterRpcFairnessConstants.CONCURRENT_NS));
+      } else {
+        for (String ns : cluster.getNameservices()) {
+          LOG.info("Taking lock first for ns: {}", ns);
+          assertTrue(routerContext.getRouter().getRpcServer()
+              .getRPCClient().getRouterRpcFairnessPolicyController()
+              .acquirePermit(ns));
+        }
+      }
+    }
+    URI address = routerContext.getFileSystemURI();
+    Configuration conf = new HdfsConfiguration();
+    final int numOps = 10;
+    final AtomicInteger overloadException = new AtomicInteger();
+
+    for (int i = 0; i < numOps; i++) {
+      DFSClient routerClient = null;
+      try {
+        routerClient = new DFSClient(address, conf);
+        String clientName = routerClient.getClientName();
+        ClientProtocol routerProto = routerClient.getNamenode();
+        if (isConcurrent) {
+          invokeConcurrent(routerProto, clientName);
+        } else {
+          invokeSequential(routerProto);
+        }
+      } catch (RemoteException re) {
+        IOException ioe = re.unwrapRemoteException();
+        assertTrue("Wrong exception: " + ioe,
+            ioe instanceof StandbyException);
+        assertExceptionContains("is overloaded for NS", ioe);
+        overloadException.incrementAndGet();
+      } catch (Throwable e) {
+        throw e;
+      } finally {
+        if (routerClient != null) {
+          try {
+            routerClient.close();
+          } catch (IOException e) {
+            LOG.error("Cannot close the client");
+          }
+        }
+      }
+      overloadException.get();
+    }
+
+    if (fairness) {
+      assertTrue(overloadException.get() > 0);
+      if (isConcurrent) {
+        LOG.info("Release fanout lock that was taken before test");
+        // take the lock for concurrent NS to block fanout calls
+        routerContext.getRouter().getRpcServer()
+            .getRPCClient().getRouterRpcFairnessPolicyController()
+            .releasePermit(RouterRpcFairnessConstants.CONCURRENT_NS);
+      } else {
+        for (String ns : cluster.getNameservices()) {
+          routerContext.getRouter().getRpcServer()
+              .getRPCClient().getRouterRpcFairnessPolicyController()
+              .releasePermit(ns);
+        }
+      }
+    } else {
+      assertEquals("Number of failed RPCs without fairness configured",
+          0, overloadException.get());
+    }
+  }
+
+  private void invokeSequential(ClientProtocol routerProto) throws IOException {
+    routerProto.getFileInfo("/test.txt");
+  }
+
+  private void invokeConcurrent(ClientProtocol routerProto, String clientName)
+      throws IOException {
+    routerProto.renewLease(clientName);
+  }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..c0c3074
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test functionality of {@link RouterRpcFairnessPolicyController).
+ */
+public class TestRouterRpcFairnessPolicyController {
+
+  private static String nameServices =
+      "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
+
+  @Test
+  public void testHandlerAllocationEqualAssignment() {
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
+        = getFairnessPolicyController(30);
+    verifyHandlerAllocation(routerRpcFairnessPolicyController);
+  }
+
+  @Test
+  public void testHandlerAllocationWithLeftOverHandler() {
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
+        = getFairnessPolicyController(31);
+    // One extra handler should be allocated to commons.
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+    verifyHandlerAllocation(routerRpcFairnessPolicyController);
+  }
+
+  @Test
+  public void testHandlerAllocationPreconfigured() {
+    Configuration conf = createConf(40);
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
+    RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
+        FederationUtil.newFairnessPolicyController(conf);
+
+    // ns1 should have 30 permits allocated
+    for (int i=0; i<30; i++) {
+      assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    }
+
+    // ns2 should have 5 permits.
+    // concurrent should have 5 permits.
+    for (int i=0; i<5; i++) {
+      assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+      assertTrue(
+          routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+    }
+
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+  }
+
+  @Test
+  public void testAllocationErrorWithZeroHandlers() {
+    Configuration conf = createConf(0);
+    verifyInstantiationError(conf);
+  }
+
+  @Test
+  public void testAllocationErrorForLowDefaultHandlers() {
+    Configuration conf = createConf(1);
+    verifyInstantiationError(conf);
+  }
+
+  @Test
+  public void testAllocationErrorForLowDefaultHandlersPerNS() {
+    Configuration conf = createConf(1);
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "concurrent", 1);
+    verifyInstantiationError(conf);
+  }
+
+  @Test
+  public void testAllocationErrorForLowPreconfiguredHandlers() {
+    Configuration conf = createConf(1);
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 2);
+    verifyInstantiationError(conf);
+  }
+
+  private void verifyInstantiationError(Configuration conf) {
+    GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer
+        .captureLogs(LoggerFactory.getLogger(
+            StaticRouterRpcFairnessPolicyController.class));
+    try {
+      FederationUtil.newFairnessPolicyController(conf);
+    } catch (IllegalArgumentException e) {
+      // Ignore the exception as it is expected here.
+    }
+    assertTrue("Should contain error message",
+        logs.getOutput().contains("lower than min"));
+  }
+
+  private RouterRpcFairnessPolicyController getFairnessPolicyController(
+      int handlers) {
+    return FederationUtil.newFairnessPolicyController(createConf(handlers));
+  }
+
+  private void verifyHandlerAllocation(
+      RouterRpcFairnessPolicyController routerRpcFairnessPolicyController) {
+    for (int i=0; i<10; i++) {
+      assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+      assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+      assertTrue(
+          routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+    }
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+    assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+
+    routerRpcFairnessPolicyController.releasePermit("ns1");
+    routerRpcFairnessPolicyController.releasePermit("ns2");
+    routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS);
+
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+    assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+  }
+
+  private Configuration createConf(int handlers) {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers);
+    conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices);
+    conf.setClass(
+        RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+        StaticRouterRpcFairnessPolicyController.class,
+        RouterRpcFairnessPolicyController.class);
+    return conf;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
index e130b7b..50527b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
@@ -47,5 +47,7 @@ public class TestRBFConfigFields extends TestConfigurationFieldsBase {
     // Allocate
     xmlPropsToSkipCompare = new HashSet<String>();
     xmlPrefixToSkipCompare = new HashSet<String>();
+    xmlPrefixToSkipCompare.add(
+        RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org