You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2020/12/04 17:31:31 UTC
[hadoop] branch trunk updated: HDFS-14090. RBF: Improved isolation
for downstream name nodes. {Static}. Contributed by Fengnan Li and CR Hota.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7dda804 HDFS-14090. RBF: Improved isolation for downstream name nodes. {Static}. Contributed by Fengnan Li and CR Hota.
7dda804 is described below
commit 7dda804a1a7e1a40b782f706582e98b41039eace
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Fri Dec 4 22:55:48 2020 +0530
HDFS-14090. RBF: Improved isolation for downstream name nodes. {Static}. Contributed by Fengnan Li and CR Hota.
---
.../AbstractRouterRpcFairnessPolicyController.java | 78 ++++++++
.../NoRouterRpcFairnessPolicyController.java | 49 +++++
.../fairness/RouterRpcFairnessConstants.java | 28 +++
.../RouterRpcFairnessPolicyController.java | 65 +++++++
.../StaticRouterRpcFairnessPolicyController.java | 126 ++++++++++++
.../server/federation/fairness/package-info.java | 28 +++
.../federation/metrics/FederationRPCMBean.java | 6 +
.../federation/metrics/FederationRPCMetrics.java | 12 ++
.../server/federation/router/FederationUtil.java | 53 ++++++
.../server/federation/router/RBFConfigKeys.java | 14 ++
.../server/federation/router/RouterRpcClient.java | 90 ++++++++-
.../src/main/resources/hdfs-rbf-default.xml | 25 +++
.../fairness/TestRouterHandlersFairness.java | 211 +++++++++++++++++++++
.../TestRouterRpcFairnessPolicyController.java | 160 ++++++++++++++++
.../federation/router/TestRBFConfigFields.java | 2 +
15 files changed, 940 insertions(+), 7 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..629f21e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/AbstractRouterRpcFairnessPolicyController.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base fairness policy that implements @RouterRpcFairnessPolicyController.
+ * Internally a map of nameservice to Semaphore is used to control permits.
+ */
+public class AbstractRouterRpcFairnessPolicyController
+ implements RouterRpcFairnessPolicyController {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractRouterRpcFairnessPolicyController.class);
+
+ /** Hash table to hold semaphore for each configured name service. */
+ private Map<String, Semaphore> permits;
+
+ public void init(Configuration conf) {
+ this.permits = new HashMap<>();
+ }
+
+ @Override
+ public boolean acquirePermit(String nsId) {
+ try {
+ LOG.debug("Taking lock for nameservice {}", nsId);
+ return this.permits.get(nsId).tryAcquire(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.debug("Cannot get a permit for nameservice {}", nsId);
+ }
+ return false;
+ }
+
+ @Override
+ public void releasePermit(String nsId) {
+ this.permits.get(nsId).release();
+ }
+
+ @Override
+ public void shutdown() {
+ // drain all semaphores
+ for (Semaphore sema: this.permits.values()) {
+ sema.drainPermits();
+ }
+ }
+
+ protected void insertNameServiceWithPermits(String nsId, int maxPermits) {
+ this.permits.put(nsId, new Semaphore(maxPermits));
+ }
+
+ protected int getAvailablePermits(String nsId) {
+ return this.permits.get(nsId).availablePermits();
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..b6a7df4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/NoRouterRpcFairnessPolicyController.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A pass through fairness policy that implements
+ * {@link RouterRpcFairnessPolicyController} and allows any number
+ * of handlers to connect to any specific downstream name service.
+ */
+public class NoRouterRpcFairnessPolicyController implements
+ RouterRpcFairnessPolicyController {
+
+ public NoRouterRpcFairnessPolicyController(Configuration conf) {
+ // Dummy constructor.
+ }
+
+ @Override
+ public boolean acquirePermit(String nsId) {
+ return true;
+ }
+
+ @Override
+ public void releasePermit(String nsId) {
+ // Dummy, pass through.
+ }
+
+ @Override
+ public void shutdown() {
+ // Nothing for now.
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessConstants.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessConstants.java
new file mode 100644
index 0000000..5e84317
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessConstants.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+public class RouterRpcFairnessConstants {
+ /** Name service keyword to identify fan-out calls. */
+ public static final String CONCURRENT_NS = "concurrent";
+
+ /* Hidden constructor */
+ protected RouterRpcFairnessConstants() {
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..80e7d21
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/RouterRpcFairnessPolicyController.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Interface to define handlers assignment for specific name services.
+ * This is needed to allow handlers provide a certain QoS for all name services
+ * configured. Implementations can choose to define any algorithm which
+ * would help maintain QoS. This is different when compared to FairCallQueue
+ * semantics as fairness has a separate context in router based federation.
+ * An implementation for example, could allocate a dedicated set of handlers
+ * per name service and allow handlers to continue making downstream name
+ * node calls if permissions are available, another implementation could use
+ * preemption semantics and dynamically increase or decrease handlers
+ * assigned per name service.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface RouterRpcFairnessPolicyController {
+ /**
+ * Request permission for a specific name service to continue the call and
+ * connect to downstream name node. Controllers based on policies defined
+ * and allocations done at start-up through assignHandlersToNameservices,
+ * may provide a permission or reject the request by throwing exception.
+ *
+ * @param nsId NS id for which a permission to continue is requested.
+ * @return true or false based on whether permit is given.
+ */
+ boolean acquirePermit(String nsId);
+
+ /**
+ * Handler threads are expected to invoke this method that signals
+ * controller to release the resources allocated to the thread for the
+ * particular name service. This would mean permissions getting available
+ * for other handlers to request for this specific name service.
+ *
+ * @param nsId Name service id for which permission release request is made.
+ */
+ void releasePermit(String nsId);
+
+ /**
+ * Shutdown steps to stop accepting new permission requests and clean-up.
+ */
+ void shutdown();
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..49a9075
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/StaticRouterRpcFairnessPolicyController.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
+
+/**
+ * Static fairness policy extending @AbstractRouterRpcFairnessPolicyController
+ * and fetching handlers from configuration for all available name services.
+ * The handlers count will not change for this controller.
+ */
+public class StaticRouterRpcFairnessPolicyController extends
+ AbstractRouterRpcFairnessPolicyController {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(StaticRouterRpcFairnessPolicyController.class);
+
+ public StaticRouterRpcFairnessPolicyController(Configuration conf) {
+ init(conf);
+ }
+
+ public void init(Configuration conf)
+ throws IllegalArgumentException {
+ super.init(conf);
+ // Total handlers configured to process all incoming Rpc.
+ int handlerCount = conf.getInt(
+ DFS_ROUTER_HANDLER_COUNT_KEY,
+ DFS_ROUTER_HANDLER_COUNT_DEFAULT);
+
+ LOG.info("Handlers available for fairness assignment {} ", handlerCount);
+
+ // Get all name services configured
+ Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(conf);
+
+ // Set to hold name services that are not
+ // configured with dedicated handlers.
+ Set<String> unassignedNS = new HashSet<>();
+
+ // Insert the concurrent nameservice into the set to process together
+ allConfiguredNS.add(CONCURRENT_NS);
+ for (String nsId : allConfiguredNS) {
+ int dedicatedHandlers =
+ conf.getInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + nsId, 0);
+ LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
+ if (dedicatedHandlers > 0) {
+ handlerCount -= dedicatedHandlers;
+ // Total handlers should not be less than sum of dedicated
+ // handlers.
+ validateCount(nsId, handlerCount, 0);
+ insertNameServiceWithPermits(nsId, dedicatedHandlers);
+ logAssignment(nsId, dedicatedHandlers);
+ } else {
+ unassignedNS.add(nsId);
+ }
+ }
+
+ // Assign remaining handlers equally to remaining name services and
+ // general pool if applicable.
+ if (!unassignedNS.isEmpty()) {
+ LOG.info("Unassigned ns {}", unassignedNS.toString());
+ int handlersPerNS = handlerCount / unassignedNS.size();
+ LOG.info("Handlers available per ns {}", handlersPerNS);
+ for (String nsId : unassignedNS) {
+ // Each NS should have at least one handler assigned.
+ validateCount(nsId, handlersPerNS, 1);
+ insertNameServiceWithPermits(nsId, handlersPerNS);
+ logAssignment(nsId, handlersPerNS);
+ }
+ }
+
+ // Assign remaining handlers if any to fan out calls.
+ int leftOverHandlers = handlerCount % unassignedNS.size();
+ int existingPermits = getAvailablePermits(CONCURRENT_NS);
+ if (leftOverHandlers > 0) {
+ LOG.info("Assigned extra {} handlers to commons pool", leftOverHandlers);
+ insertNameServiceWithPermits(CONCURRENT_NS,
+ existingPermits + leftOverHandlers);
+ }
+ LOG.info("Final permit allocation for concurrent ns: {}",
+ getAvailablePermits(CONCURRENT_NS));
+ }
+
+ private static void logAssignment(String nsId, int count) {
+ LOG.info("Assigned {} handlers to nsId {} ",
+ count, nsId);
+ }
+
+ private static void validateCount(String nsId, int handlers, int min) throws
+ IllegalArgumentException {
+ if (handlers < min) {
+ String msg =
+ "Available handlers " + handlers +
+ " lower than min " + min +
+ " for nsId " + nsId;
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/package-info.java
new file mode 100644
index 0000000..6b10326
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/fairness/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Includes router handlers fairness manager and policy implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index f57e310..3cde5e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -99,4 +99,10 @@ public interface FederationRPCMBean {
* @return JSON string representation of the async caller thread pool.
*/
String getAsyncCallerPool();
+
+ /**
+ * Get the number of operations rejected due to lack of permits.
+ * @return Number of operations rejected due to lack of permits.
+ */
+ long getProxyOpPermitRejected();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index dd92725..887d50b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -72,6 +72,9 @@ public class FederationRPCMetrics implements FederationRPCMBean {
@Metric("Failed requests due to safe mode")
private MutableCounterLong routerFailureSafemode;
+ @Metric("Number of operations to hit permit limits")
+ private MutableCounterLong proxyOpPermitRejected;
+
public FederationRPCMetrics(Configuration conf, RouterRpcServer rpcServer) {
this.rpcServer = rpcServer;
@@ -264,4 +267,13 @@ public class FederationRPCMetrics implements FederationRPCMBean {
public long getProcessingOps() {
return processingOp.value();
}
+
+ public void incrProxyOpPermitRejected() {
+ proxyOpPermitRejected.incr();
+ }
+
+ @Override
+ public long getProxyOpPermitRejected() {
+ return proxyOpPermitRejected.value();
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
index 9af9f9a..7ff8539 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederationUtil.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -24,12 +26,16 @@ import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
import java.net.URL;
import java.net.URLConnection;
+import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
@@ -248,4 +254,51 @@ public final class FederationUtil {
.storagePolicy(dirStatus.getStoragePolicy())
.symlink(dirStatus.getSymlinkInBytes()).flags(flags).build();
}
+
+ /**
+ * Creates an instance of an RouterRpcFairnessPolicyController
+ * from the configuration.
+ *
+ * @param conf Configuration that defines the fairness controller class.
+ * @return Fairness policy controller.
+ */
+ public static RouterRpcFairnessPolicyController newFairnessPolicyController(
+ Configuration conf) {
+ Class<? extends RouterRpcFairnessPolicyController> clazz = conf.getClass(
+ RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+ RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS_DEFAULT,
+ RouterRpcFairnessPolicyController.class);
+ return newInstance(conf, null, null, clazz);
+ }
+
+ /**
+ * Collect all configured nameservices.
+ *
+ * @param conf
+ * @return Set of name services in config
+ * @throws IllegalArgumentException
+ */
+ public static Set<String> getAllConfiguredNS(Configuration conf)
+ throws IllegalArgumentException {
+ // Get all name services configured
+ Collection<String> namenodes = conf.getTrimmedStringCollection(
+ DFS_ROUTER_MONITOR_NAMENODE);
+
+ Set<String> nameservices = new HashSet();
+ for (String namenode : namenodes) {
+ String[] namenodeSplit = namenode.split("\\.");
+ String nsId;
+ if (namenodeSplit.length == 2) {
+ nsId = namenodeSplit[0];
+ } else if (namenodeSplit.length == 1) {
+ nsId = namenode;
+ } else {
+ String errorMsg = "Wrong name service specified : " + namenode;
+ throw new IllegalArgumentException(
+ errorMsg);
+ }
+ nameservices.add(nsId);
+ }
+ return nameservices;
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index ba3659e..8fd2e28 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCPerformanceMonitor;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -334,4 +336,16 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final Class<? extends AbstractDelegationTokenSecretManager>
DFS_ROUTER_DELEGATION_TOKEN_DRIVER_CLASS_DEFAULT =
ZKDelegationTokenSecretManagerImpl.class;
+
+ // HDFS Router fairness
+ public static final String FEDERATION_ROUTER_FAIRNESS_PREFIX =
+ FEDERATION_ROUTER_PREFIX + "fairness.";
+ public static final String
+ DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS =
+ FEDERATION_ROUTER_FAIRNESS_PREFIX + "policy.controller.class";
+ public static final Class<? extends RouterRpcFairnessPolicyController>
+ DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS_DEFAULT =
+ NoRouterRpcFairnessPolicyController.class;
+ public static final String DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX =
+ FEDERATION_ROUTER_FAIRNESS_PREFIX + "handler.count.";
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 8ae3731..87f2ed7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_C
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_CONTEXT_SEPARATOR_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import java.io.EOFException;
import java.io.FileNotFoundException;
@@ -61,6 +62,8 @@ import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
+import org.apache.hadoop.hdfs.server.federation.fairness.AbstractRouterRpcFairnessPolicyController;
+import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
@@ -129,6 +132,9 @@ public class RouterRpcClient {
private static final String CLIENT_IP_STR = "clientIp";
+ /** Fairness manager to control handlers assigned per NS. */
+ private RouterRpcFairnessPolicyController routerRpcFairnessPolicyController;
+
/**
* Create a router RPC client to manage remote procedure calls to NNs.
*
@@ -149,6 +155,8 @@ public class RouterRpcClient {
HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT);
this.connectionManager = new ConnectionManager(clientConf);
this.connectionManager.start();
+ this.routerRpcFairnessPolicyController =
+ FederationUtil.newFairnessPolicyController(conf);
int numThreads = conf.getInt(
RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
@@ -229,6 +237,9 @@ public class RouterRpcClient {
if (this.executorService != null) {
this.executorService.shutdownNow();
}
+ if (this.routerRpcFairnessPolicyController != null) {
+ this.routerRpcFairnessPolicyController.shutdown();
+ }
}
/**
@@ -770,13 +781,18 @@ public class RouterRpcClient {
public Object invokeSingle(final String nsId, RemoteMethod method)
throws IOException {
UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
- List<? extends FederationNamenodeContext> nns =
- getNamenodesForNameservice(nsId);
- RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
- Class<?> proto = method.getProtocol();
- Method m = method.getMethod();
- Object[] params = method.getParams(loc);
- return invokeMethod(ugi, nns, proto, m, params);
+ acquirePermit(nsId, ugi, method);
+ try {
+ List<? extends FederationNamenodeContext> nns =
+ getNamenodesForNameservice(nsId);
+ RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/");
+ Class<?> proto = method.getProtocol();
+ Method m = method.getMethod();
+ Object[] params = method.getParams(loc);
+ return invokeMethod(ugi, nns, proto, m, params);
+ } finally {
+ releasePermit(nsId, ugi, method);
+ }
}
/**
@@ -933,6 +949,7 @@ public class RouterRpcClient {
// Invoke in priority order
for (final RemoteLocationContext loc : locations) {
String ns = loc.getNameserviceId();
+ acquirePermit(ns, ugi, remoteMethod);
List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
@@ -966,6 +983,8 @@ public class RouterRpcClient {
IOException ioe = new IOException(
"Unexpected exception proxying API " + e.getMessage(), e);
thrownExceptions.add(ioe);
+ } finally {
+ releasePermit(ns, ugi, remoteMethod);
}
}
@@ -1298,6 +1317,7 @@ public class RouterRpcClient {
// Shortcut, just one call
T location = locations.iterator().next();
String ns = location.getNameserviceId();
+ acquirePermit(ns, ugi, method);
final List<? extends FederationNamenodeContext> namenodes =
getNamenodesForNameservice(ns);
try {
@@ -1309,6 +1329,8 @@ public class RouterRpcClient {
} catch (IOException ioe) {
// Localize the exception
throw processException(ioe, location);
+ } finally {
+ releasePermit(ns, ugi, method);
}
}
@@ -1355,6 +1377,7 @@ public class RouterRpcClient {
rpcMonitor.proxyOp();
}
+ acquirePermit(CONCURRENT_NS, ugi, method);
try {
List<Future<Object>> futures = null;
if (timeOutMs > 0) {
@@ -1411,6 +1434,8 @@ public class RouterRpcClient {
LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
throw new IOException(
"Unexpected error while invoking API " + ex.getMessage(), ex);
+ } finally {
+ releasePermit(CONCURRENT_NS, ugi, method);
}
}
@@ -1484,4 +1509,55 @@ public class RouterRpcClient {
FederationNamenodeContext namenode = namenodes.get(0);
return namenode.getNameserviceId();
}
+
+ /**
+ * Acquire permit to continue processing the request for specific nsId.
+ *
+ * @param nsId Identifier of the block pool.
+ * @param ugi UserGroupIdentifier associated with the user.
+ * @param m Remote method that needs to be invoked.
+ * @throws IOException If permit could not be acquired for the nsId.
+ */
+ private void acquirePermit(
+ final String nsId, final UserGroupInformation ugi, final RemoteMethod m)
+ throws IOException {
+ if (routerRpcFairnessPolicyController != null
+ && !routerRpcFairnessPolicyController.acquirePermit(nsId)) {
+ // Throw StandByException,
+ // Clients could fail over and try another router.
+ if (rpcMonitor != null) {
+ rpcMonitor.getRPCMetrics().incrProxyOpPermitRejected();
+ }
+ LOG.debug("Permit denied for ugi: {} for method: {}",
+ ugi, m.getMethodName());
+ String msg =
+ "Router " + router.getRouterId() +
+ " is overloaded for NS: " + nsId;
+ throw new StandbyException(msg);
+ }
+ }
+
+ /**
+ * Release permit for specific nsId after processing against downstream
+ * nsId is completed.
+ *
+ * @param nsId Identifier of the block pool.
+ * @param ugi UserGroupIdentifier associated with the user.
+ * @param m Remote method that needs to be invoked.
+ */
+ private void releasePermit(
+ final String nsId, final UserGroupInformation ugi, final RemoteMethod m) {
+ if (routerRpcFairnessPolicyController != null) {
+ routerRpcFairnessPolicyController.releasePermit(nsId);
+ LOG.trace("Permit released for ugi: {} for method: {}", ugi,
+ m.getMethodName());
+ }
+ }
+
+ @VisibleForTesting
+ public AbstractRouterRpcFairnessPolicyController
+ getRouterRpcFairnessPolicyController() {
+ return (AbstractRouterRpcFairnessPolicyController
+ )routerRpcFairnessPolicyController;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index aab90e4..8c17185 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -677,4 +677,29 @@
</description>
</property>
+ <property>
+ <name>dfs.federation.router.fairness.policy.controller.class</name>
+ <value>org.apache.hadoop.hdfs.server.federation.fairness.NoRouterRpcFairnessPolicyController</value>
+ <description>
+ No fairness policy handler by default, for fairness
+ StaticFairnessPolicyController should be configured.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.fairness.handler.count.EXAMPLENAMESERVICE</name>
+ <value></value>
+ <description>
+ Dedicated handler count for nameservice EXAMPLENAMESERVICE. The handler
+ (configed by dfs.federation.router.handler.count)resource is controlled
+ internally by Semaphore permits. Two requirements have to be satisfied.
+ 1) all downstream nameservices need this config otherwise no permit will
+ be given thus not proxy will happen. 2) if a special *concurrent*
+ nameservice is specified, the sum of all configured values is smaller or
+ equal to the total number of router handlers; if the special *concurrent*
+ is not specified, the sum of all configured values must be strictly
+ smaller than the router handlers thus the left will be allocated to the
+ concurrent calls.
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java
new file mode 100644
index 0000000..c3fc324
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterHandlersFairness.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the Router handlers fairness control rejects
+ * requests when the handlers are overloaded.
+ */
+public class TestRouterHandlersFairness {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRouterHandlersFairness.class);
+
+ private StateStoreDFSCluster cluster;
+
+ @After
+ public void cleanup() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private void setupCluster(boolean fairnessEnable, boolean ha)
+ throws Exception {
+ // Build and start a federated cluster
+ cluster = new StateStoreDFSCluster(ha, 2);
+ Configuration routerConf = new RouterConfigBuilder()
+ .stateStore()
+ .rpc()
+ .build();
+
+ // Fairness control
+ if (fairnessEnable) {
+ routerConf.setClass(
+ RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+ StaticRouterRpcFairnessPolicyController.class,
+ RouterRpcFairnessPolicyController.class);
+ }
+
+ // With two name services configured, each nameservice has 1 permit and
+ // fan-out calls have 1 permit.
+ routerConf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 3);
+
+ // Datanodes not needed for this test.
+ cluster.setNumDatanodesPerNameservice(0);
+
+ cluster.addRouterOverrides(routerConf);
+ cluster.startCluster();
+ cluster.startRouters();
+ cluster.waitClusterUp();
+ }
+
+ @Test
+ public void testFairnessControlOff() throws Exception {
+ setupCluster(false, false);
+ startLoadTest(false);
+ }
+
+ @Test
+ public void testFairnessControlOn() throws Exception {
+ setupCluster(true, false);
+ startLoadTest(true);
+ }
+
+ /**
+ * Start a generic load test as a client against a cluster which has either
+ * fairness configured or not configured. Test will spawn a set of 100
+ * threads to simulate concurrent request to test routers. If fairness is
+ * enabled, the test will successfully report the failure of some threads
+ * to continue with StandbyException. If fairness is not configured, all
+ * threads of the same test should successfully complete all the rpcs.
+ *
+ * @param fairness Flag to indicate if fairness management is on/off.
+ * @throws Exception Throws exception.
+ */
+ private void startLoadTest(boolean fairness)
+ throws Exception {
+
+ // Concurrent requests
+ startLoadTest(true, fairness);
+
+ // Sequential requests
+ startLoadTest(false, fairness);
+ }
+
+ private void startLoadTest(final boolean isConcurrent, final boolean fairness)
+ throws Exception {
+
+ RouterContext routerContext = cluster.getRandomRouter();
+ if (fairness) {
+ if (isConcurrent) {
+ LOG.info("Taking fanout lock first");
+ // take the lock for concurrent NS to block fanout calls
+ assertTrue(routerContext.getRouter().getRpcServer()
+ .getRPCClient().getRouterRpcFairnessPolicyController()
+ .acquirePermit(RouterRpcFairnessConstants.CONCURRENT_NS));
+ } else {
+ for (String ns : cluster.getNameservices()) {
+ LOG.info("Taking lock first for ns: {}", ns);
+ assertTrue(routerContext.getRouter().getRpcServer()
+ .getRPCClient().getRouterRpcFairnessPolicyController()
+ .acquirePermit(ns));
+ }
+ }
+ }
+ URI address = routerContext.getFileSystemURI();
+ Configuration conf = new HdfsConfiguration();
+ final int numOps = 10;
+ final AtomicInteger overloadException = new AtomicInteger();
+
+ for (int i = 0; i < numOps; i++) {
+ DFSClient routerClient = null;
+ try {
+ routerClient = new DFSClient(address, conf);
+ String clientName = routerClient.getClientName();
+ ClientProtocol routerProto = routerClient.getNamenode();
+ if (isConcurrent) {
+ invokeConcurrent(routerProto, clientName);
+ } else {
+ invokeSequential(routerProto);
+ }
+ } catch (RemoteException re) {
+ IOException ioe = re.unwrapRemoteException();
+ assertTrue("Wrong exception: " + ioe,
+ ioe instanceof StandbyException);
+ assertExceptionContains("is overloaded for NS", ioe);
+ overloadException.incrementAndGet();
+ } catch (Throwable e) {
+ throw e;
+ } finally {
+ if (routerClient != null) {
+ try {
+ routerClient.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close the client");
+ }
+ }
+ }
+ overloadException.get();
+ }
+
+ if (fairness) {
+ assertTrue(overloadException.get() > 0);
+ if (isConcurrent) {
+ LOG.info("Release fanout lock that was taken before test");
+ // take the lock for concurrent NS to block fanout calls
+ routerContext.getRouter().getRpcServer()
+ .getRPCClient().getRouterRpcFairnessPolicyController()
+ .releasePermit(RouterRpcFairnessConstants.CONCURRENT_NS);
+ } else {
+ for (String ns : cluster.getNameservices()) {
+ routerContext.getRouter().getRpcServer()
+ .getRPCClient().getRouterRpcFairnessPolicyController()
+ .releasePermit(ns);
+ }
+ }
+ } else {
+ assertEquals("Number of failed RPCs without fairness configured",
+ 0, overloadException.get());
+ }
+ }
+
+ private void invokeSequential(ClientProtocol routerProto) throws IOException {
+ routerProto.getFileInfo("/test.txt");
+ }
+
+ private void invokeConcurrent(ClientProtocol routerProto, String clientName)
+ throws IOException {
+ routerProto.renewLease(clientName);
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
new file mode 100644
index 0000000..c0c3074
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRpcFairnessPolicyController.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.router.FederationUtil;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test functionality of {@link RouterRpcFairnessPolicyController).
+ */
+public class TestRouterRpcFairnessPolicyController {
+
+ private static String nameServices =
+ "ns1.nn1, ns1.nn2, ns2.nn1, ns2.nn2";
+
+ @Test
+ public void testHandlerAllocationEqualAssignment() {
+ RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
+ = getFairnessPolicyController(30);
+ verifyHandlerAllocation(routerRpcFairnessPolicyController);
+ }
+
+ @Test
+ public void testHandlerAllocationWithLeftOverHandler() {
+ RouterRpcFairnessPolicyController routerRpcFairnessPolicyController
+ = getFairnessPolicyController(31);
+ // One extra handler should be allocated to commons.
+ assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+ verifyHandlerAllocation(routerRpcFairnessPolicyController);
+ }
+
+ @Test
+ public void testHandlerAllocationPreconfigured() {
+ Configuration conf = createConf(40);
+ conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 30);
+ RouterRpcFairnessPolicyController routerRpcFairnessPolicyController =
+ FederationUtil.newFairnessPolicyController(conf);
+
+ // ns1 should have 30 permits allocated
+ for (int i=0; i<30; i++) {
+ assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+ }
+
+ // ns2 should have 5 permits.
+ // concurrent should have 5 permits.
+ for (int i=0; i<5; i++) {
+ assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+ assertTrue(
+ routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+ }
+
+ assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+ assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+ assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+ }
+
+ @Test
+ public void testAllocationErrorWithZeroHandlers() {
+ Configuration conf = createConf(0);
+ verifyInstantiationError(conf);
+ }
+
+ @Test
+ public void testAllocationErrorForLowDefaultHandlers() {
+ Configuration conf = createConf(1);
+ verifyInstantiationError(conf);
+ }
+
+ @Test
+ public void testAllocationErrorForLowDefaultHandlersPerNS() {
+ Configuration conf = createConf(1);
+ conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "concurrent", 1);
+ verifyInstantiationError(conf);
+ }
+
+ @Test
+ public void testAllocationErrorForLowPreconfiguredHandlers() {
+ Configuration conf = createConf(1);
+ conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns1", 2);
+ verifyInstantiationError(conf);
+ }
+
+ private void verifyInstantiationError(Configuration conf) {
+ GenericTestUtils.LogCapturer logs = GenericTestUtils.LogCapturer
+ .captureLogs(LoggerFactory.getLogger(
+ StaticRouterRpcFairnessPolicyController.class));
+ try {
+ FederationUtil.newFairnessPolicyController(conf);
+ } catch (IllegalArgumentException e) {
+ // Ignore the exception as it is expected here.
+ }
+ assertTrue("Should contain error message",
+ logs.getOutput().contains("lower than min"));
+ }
+
+ private RouterRpcFairnessPolicyController getFairnessPolicyController(
+ int handlers) {
+ return FederationUtil.newFairnessPolicyController(createConf(handlers));
+ }
+
+ private void verifyHandlerAllocation(
+ RouterRpcFairnessPolicyController routerRpcFairnessPolicyController) {
+ for (int i=0; i<10; i++) {
+ assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+ assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+ assertTrue(
+ routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+ }
+ assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+ assertFalse(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+ assertFalse(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+
+ routerRpcFairnessPolicyController.releasePermit("ns1");
+ routerRpcFairnessPolicyController.releasePermit("ns2");
+ routerRpcFairnessPolicyController.releasePermit(CONCURRENT_NS);
+
+ assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns1"));
+ assertTrue(routerRpcFairnessPolicyController.acquirePermit("ns2"));
+ assertTrue(routerRpcFairnessPolicyController.acquirePermit(CONCURRENT_NS));
+ }
+
+ private Configuration createConf(int handlers) {
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, handlers);
+ conf.set(DFS_ROUTER_MONITOR_NAMENODE, nameServices);
+ conf.setClass(
+ RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+ StaticRouterRpcFairnessPolicyController.class,
+ RouterRpcFairnessPolicyController.class);
+ return conf;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
index e130b7b..50527b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRBFConfigFields.java
@@ -47,5 +47,7 @@ public class TestRBFConfigFields extends TestConfigurationFieldsBase {
// Allocate
xmlPropsToSkipCompare = new HashSet<String>();
xmlPrefixToSkipCompare = new HashSet<String>();
+ xmlPrefixToSkipCompare.add(
+ RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org