You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/05/04 19:27:53 UTC
[25/50] [abbrv] hadoop git commit: HDFS-13488. RBF: Reject requests
when a Router is overloaded. Contributed by Inigo Goiri.
HDFS-13488. RBF: Reject requests when a Router is overloaded. Contributed by Inigo Goiri.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/37269261
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/37269261
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/37269261
Branch: refs/heads/HDFS-12943
Commit: 37269261d1232bc71708f30c76193188258ef4bd
Parents: 8f42daf
Author: Yiqun Lin <yq...@apache.org>
Authored: Wed May 2 14:49:39 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Wed May 2 14:49:39 2018 +0800
----------------------------------------------------------------------
.../federation/metrics/FederationRPCMBean.java | 2 +
.../metrics/FederationRPCMetrics.java | 10 +
.../FederationRPCPerformanceMonitor.java | 5 +
.../server/federation/router/RBFConfigKeys.java | 3 +
.../federation/router/RouterRpcClient.java | 31 ++-
.../federation/router/RouterRpcMonitor.java | 6 +
.../federation/router/RouterRpcServer.java | 11 +-
.../router/RouterSafeModeException.java | 53 ----
.../src/main/resources/hdfs-rbf-default.xml | 9 +
.../server/federation/FederationTestUtils.java | 2 +-
.../server/federation/StateStoreDFSCluster.java | 28 +++
.../router/TestRouterClientRejectOverload.java | 243 +++++++++++++++++++
.../router/TestRouterRPCClientRetries.java | 51 +---
.../federation/router/TestRouterSafemode.java | 3 +-
14 files changed, 349 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index 3e031fe..973c398 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -40,6 +40,8 @@ public interface FederationRPCMBean {
long getProxyOpFailureStandby();
+ long getProxyOpFailureClientOverloaded();
+
long getProxyOpNotImplemented();
long getProxyOpRetries();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index 94d3383..9ab4e5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -54,6 +54,8 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private MutableCounterLong proxyOpFailureStandby;
@Metric("Number of operations to hit a standby NN")
private MutableCounterLong proxyOpFailureCommunicate;
+ @Metric("Number of operations to hit a client overloaded Router")
+ private MutableCounterLong proxyOpFailureClientOverloaded;
@Metric("Number of operations not implemented")
private MutableCounterLong proxyOpNotImplemented;
@Metric("Number of operation retries")
@@ -118,6 +120,14 @@ public class FederationRPCMetrics implements FederationRPCMBean {
return proxyOpFailureCommunicate.value();
}
+ public void incrProxyOpFailureClientOverloaded() {
+ proxyOpFailureClientOverloaded.incr();
+ }
+
+ @Override
+ public long getProxyOpFailureClientOverloaded() {
+ return proxyOpFailureClientOverloaded.value();
+ }
public void incrProxyOpNotImplemented() {
proxyOpNotImplemented.incr();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
index 547ebb5..2c2741e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
@@ -154,6 +154,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
}
@Override
+ public void proxyOpFailureClientOverloaded() {
+ metrics.incrProxyOpFailureClientOverloaded();
+ }
+
+ @Override
public void proxyOpNotImplemented() {
metrics.incrProxyOpNotImplemented();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 170b876..363db20 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -113,6 +113,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
+ public static final String DFS_ROUTER_CLIENT_REJECT_OVERLOAD =
+ FEDERATION_ROUTER_PREFIX + "client.reject.overload";
+ public static final boolean DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT = false;
// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 513e867..3eb7241 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -35,13 +35,16 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -98,7 +101,7 @@ public class RouterRpcClient {
/** Connection pool to the Namenodes per user for performance. */
private final ConnectionManager connectionManager;
/** Service to run asynchronous calls. */
- private final ExecutorService executorService;
+ private final ThreadPoolExecutor executorService;
/** Retry policy for router -> NN communication. */
private final RetryPolicy retryPolicy;
/** Optional perf monitor. */
@@ -131,8 +134,16 @@ public class RouterRpcClient {
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router Client-%d")
.build();
- this.executorService = Executors.newFixedThreadPool(
- numThreads, threadFactory);
+ BlockingQueue<Runnable> workQueue;
+ if (conf.getBoolean(
+ RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD,
+ RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT)) {
+ workQueue = new ArrayBlockingQueue<>(numThreads);
+ } else {
+ workQueue = new LinkedBlockingQueue<>();
+ }
+ this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
+ 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
this.rpcMonitor = monitor;
@@ -1106,6 +1117,16 @@ public class RouterRpcClient {
}
return results;
+ } catch (RejectedExecutionException e) {
+ if (rpcMonitor != null) {
+ rpcMonitor.proxyOpFailureClientOverloaded();
+ }
+ int active = executorService.getActiveCount();
+ int total = executorService.getMaximumPoolSize();
+ String msg = "Not enough client threads " + active + "/" + total;
+ LOG.error(msg);
+ throw new StandbyException(
+ "Router " + routerId + " is overloaded: " + msg);
} catch (InterruptedException ex) {
LOG.error("Unexpected error while invoking API: {}", ex.getMessage());
throw new IOException(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
index df9aa11..7af71af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
@@ -76,6 +76,12 @@ public interface RouterRpcMonitor {
void proxyOpFailureCommunicate();
/**
+ * Failed to proxy an operation to a Namenode because the client was
+ * overloaded.
+ */
+ void proxyOpFailureClientOverloaded();
+
+ /**
* Failed to proxy an operation because it is not implemented.
*/
void proxyOpNotImplemented();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 21f26d0..6b466b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -289,7 +289,6 @@ public class RouterRpcServer extends AbstractService
// We don't want the server to log the full stack trace for some exceptions
this.rpcServer.addTerseExceptions(
RemoteException.class,
- StandbyException.class,
SafeModeException.class,
FileNotFoundException.class,
FileAlreadyExistsException.class,
@@ -298,6 +297,9 @@ public class RouterRpcServer extends AbstractService
NotReplicatedYetException.class,
IOException.class);
+ this.rpcServer.addSuppressedLoggingExceptions(
+ StandbyException.class);
+
// The RPC-server port can be ephemeral... ensure we have the correct info
InetSocketAddress listenAddress = this.rpcServer.getListenerAddress();
this.rpcAddress = new InetSocketAddress(
@@ -413,7 +415,7 @@ public class RouterRpcServer extends AbstractService
* @throws UnsupportedOperationException If the operation is not supported.
*/
protected void checkOperation(OperationCategory op, boolean supported)
- throws RouterSafeModeException, UnsupportedOperationException {
+ throws StandbyException, UnsupportedOperationException {
checkOperation(op);
if (!supported) {
@@ -435,7 +437,7 @@ public class RouterRpcServer extends AbstractService
* client requests.
*/
protected void checkOperation(OperationCategory op)
- throws RouterSafeModeException {
+ throws StandbyException {
// Log the function we are currently calling.
if (rpcMonitor != null) {
rpcMonitor.startOp();
@@ -459,7 +461,8 @@ public class RouterRpcServer extends AbstractService
if (rpcMonitor != null) {
rpcMonitor.routerFailureSafemode();
}
- throw new RouterSafeModeException(router.getRouterId(), op);
+ throw new StandbyException("Router " + router.getRouterId() +
+ " is in safe mode and cannot handle " + op + " requests");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
deleted file mode 100644
index 7a78b5b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterSafeModeException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.federation.router;
-
-import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.ipc.StandbyException;
-
-/**
- * Exception that the Router throws when it is in safe mode. This extends
- * {@link StandbyException} for the client to try another Router when it gets
- * this exception.
- */
-public class RouterSafeModeException extends StandbyException {
-
- private static final long serialVersionUID = 453568188334993493L;
-
- /** Identifier of the Router that generated this exception. */
- private final String routerId;
-
- /**
- * Build a new Router safe mode exception.
- * @param router Identifier of the Router.
- * @param op Category of the operation (READ/WRITE).
- */
- public RouterSafeModeException(String router, OperationCategory op) {
- super("Router " + router + " is in safe mode and cannot handle " + op
- + " requests.");
- this.routerId = router;
- }
-
- /**
- * Get the id of the Router that generated this exception.
- * @return Id of the Router that generated this exception.
- */
- public String getRouterId() {
- return this.routerId;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 92f899d..8806cb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -431,4 +431,13 @@
</description>
</property>
+ <property>
+ <name>dfs.federation.router.client.reject.overload</name>
+ <value>false</value>
+ <description>
+ Set to true to reject client requests when we run out of RPC client
+ threads.
+ </description>
+ </property>
+
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
index ed1428a..ce320f4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.Whitebox;
+import org.mockito.internal.util.reflection.Whitebox;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
index bf63b18..9d56f13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/StateStoreDFSCluster.java
@@ -28,6 +28,10 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -37,6 +41,7 @@ import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.util.StringUtils;
/**
* Test utility to mimic a federated HDFS cluster with a router and a state
@@ -145,4 +150,27 @@ public class StateStoreDFSCluster extends MiniRouterDFSCluster {
entries.add(entry);
return entries;
}
+
+ /**
+ * Get the client configuration which targets all the Routers. It uses the HA
+ * setup to fails over between them.
+ * @return Configuration for the client which uses two routers.
+ */
+ public Configuration getRouterClientConf() {
+ List<RouterContext> routers = getRouters();
+ Configuration clientConf = DFSTestUtil.newHAConfiguration("fed");
+ int i = 0;
+ List<String> names = new ArrayList<>(routers.size());
+ for (RouterContext routerContext : routers) {
+ String name = "r" + i++;
+ clientConf.set(
+ DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + ".fed." + name,
+ "localhost:" + routerContext.getRpcPort());
+ names.add(name);
+ }
+ clientConf.set(DFSUtil.addKeySuffixes(
+ HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX, "fed"),
+ StringUtils.join(",", names));
+ return clientConf;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
new file mode 100644
index 0000000..3c51e13
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterClientRejectOverload.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test the Router overload control which rejects requests when the RPC client
+ * is overloaded. This feature is managed by
+ * {@link RBFConfigKeys#DFS_ROUTER_CLIENT_REJECT_OVERLOAD}.
+ */
+public class TestRouterClientRejectOverload {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRouterClientRejectOverload.class);
+
+ private StateStoreDFSCluster cluster;
+
+ @After
+ public void cleanup() {
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ private void setupCluster(boolean overloadControl) throws Exception {
+ // Build and start a federated cluster
+ cluster = new StateStoreDFSCluster(false, 2);
+ Configuration routerConf = new RouterConfigBuilder()
+ .stateStore()
+ .metrics()
+ .admin()
+ .rpc()
+ .build();
+
+ // Reduce the number of RPC clients threads to overload the Router easy
+ routerConf.setInt(RBFConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE, 4);
+ // Overload control
+ routerConf.setBoolean(
+ RBFConfigKeys.DFS_ROUTER_CLIENT_REJECT_OVERLOAD, overloadControl);
+
+ // No need for datanodes as we use renewLease() for testing
+ cluster.setNumDatanodesPerNameservice(0);
+
+ cluster.addRouterOverrides(routerConf);
+ cluster.startCluster();
+ cluster.startRouters();
+ cluster.waitClusterUp();
+ }
+
+ @Test
+ public void testWithoutOverloadControl() throws Exception {
+ setupCluster(false);
+
+ // Nobody should get overloaded
+ testOverloaded(0);
+
+ // Set subcluster 0 as slow
+ MiniDFSCluster dfsCluster = cluster.getCluster();
+ NameNode nn0 = dfsCluster.getNameNode(0);
+ simulateSlowNamenode(nn0, 1);
+
+ // Nobody should get overloaded, but it will be really slow
+ testOverloaded(0);
+
+ // No rejected requests expected
+ for (RouterContext router : cluster.getRouters()) {
+ FederationRPCMetrics rpcMetrics =
+ router.getRouter().getRpcServer().getRPCMetrics();
+ assertEquals(0, rpcMetrics.getProxyOpFailureClientOverloaded());
+ }
+ }
+
+ @Test
+ public void testOverloadControl() throws Exception {
+ setupCluster(true);
+
+ List<RouterContext> routers = cluster.getRouters();
+ FederationRPCMetrics rpcMetrics0 =
+ routers.get(0).getRouter().getRpcServer().getRPCMetrics();
+ FederationRPCMetrics rpcMetrics1 =
+ routers.get(1).getRouter().getRpcServer().getRPCMetrics();
+
+ // Nobody should get overloaded
+ testOverloaded(0);
+ assertEquals(0, rpcMetrics0.getProxyOpFailureClientOverloaded());
+ assertEquals(0, rpcMetrics1.getProxyOpFailureClientOverloaded());
+
+ // Set subcluster 0 as slow
+ MiniDFSCluster dfsCluster = cluster.getCluster();
+ NameNode nn0 = dfsCluster.getNameNode(0);
+ simulateSlowNamenode(nn0, 1);
+
+ // The subcluster should be overloaded now and reject 4-5 requests
+ testOverloaded(4, 6);
+ assertTrue(rpcMetrics0.getProxyOpFailureClientOverloaded()
+ + rpcMetrics1.getProxyOpFailureClientOverloaded() >= 4);
+
+ // Client using HA with 2 Routers
+ // A single Router gets overloaded, but 2 will handle it
+ Configuration clientConf = cluster.getRouterClientConf();
+
+ // Each Router should get a similar number of ops (>=8) out of 2*10
+ long iniProxyOps0 = rpcMetrics0.getProxyOps();
+ long iniProxyOps1 = rpcMetrics1.getProxyOps();
+ testOverloaded(0, 0, new URI("hdfs://fed/"), clientConf, 10);
+ long proxyOps0 = rpcMetrics0.getProxyOps() - iniProxyOps0;
+ long proxyOps1 = rpcMetrics1.getProxyOps() - iniProxyOps1;
+ assertEquals(2 * 10, proxyOps0 + proxyOps1);
+ assertTrue(proxyOps0 + " operations: not distributed", proxyOps0 >= 8);
+ assertTrue(proxyOps1 + " operations: not distributed", proxyOps1 >= 8);
+ }
+
+ private void testOverloaded(int expOverload) throws Exception {
+ testOverloaded(expOverload, expOverload);
+ }
+
+ private void testOverloaded(int expOverloadMin, int expOverloadMax)
+ throws Exception {
+ RouterContext routerContext = cluster.getRandomRouter();
+ URI address = routerContext.getFileSystemURI();
+ Configuration conf = new HdfsConfiguration();
+ testOverloaded(expOverloadMin, expOverloadMax, address, conf, 10);
+ }
+
+ /**
+ * Test if the Router gets overloaded by submitting requests in parallel.
+ * We check how many requests got rejected at the end.
+ * @param expOverloadMin Min number of requests expected as overloaded.
+ * @param expOverloadMax Max number of requests expected as overloaded.
+ * @param address Destination address.
+ * @param conf Configuration of the client.
+ * @param numOps Number of operations to submit.
+ * @throws Exception If it cannot perform the test.
+ */
+ private void testOverloaded(int expOverloadMin, int expOverloadMax,
+ final URI address, final Configuration conf, final int numOps)
+ throws Exception {
+
+ // Submit renewLease() ops which go to all subclusters
+ final AtomicInteger overloadException = new AtomicInteger();
+ ExecutorService exec = Executors.newFixedThreadPool(numOps);
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < numOps; i++) {
+ // Stagger the operations a little (50ms)
+ final int sleepTime = i * 50;
+ Future<?> future = exec.submit(new Runnable() {
+ @Override
+ public void run() {
+ DFSClient routerClient = null;
+ try {
+ Thread.sleep(sleepTime);
+ routerClient = new DFSClient(address, conf);
+ String clientName = routerClient.getClientName();
+ ClientProtocol routerProto = routerClient.getNamenode();
+ routerProto.renewLease(clientName);
+ } catch (RemoteException re) {
+ IOException ioe = re.unwrapRemoteException();
+ assertTrue("Wrong exception: " + ioe,
+ ioe instanceof StandbyException);
+ assertExceptionContains("is overloaded", ioe);
+ overloadException.incrementAndGet();
+ } catch (IOException e) {
+ fail("Unexpected exception: " + e);
+ } catch (InterruptedException e) {
+ fail("Cannot sleep: " + e);
+ } finally {
+ if (routerClient != null) {
+ try {
+ routerClient.close();
+ } catch (IOException e) {
+ LOG.error("Cannot close the client");
+ }
+ }
+ }
+ }
+ });
+ futures.add(future);
+ }
+ // Wait until all the requests are done
+ while (!futures.isEmpty()) {
+ futures.remove(0).get();
+ }
+ exec.shutdown();
+
+ int num = overloadException.get();
+ if (expOverloadMin == expOverloadMax) {
+ assertEquals(expOverloadMin, num);
+ } else {
+ assertTrue("Expected >=" + expOverloadMin + " but was " + num,
+ num >= expOverloadMin);
+ assertTrue("Expected <=" + expOverloadMax + " but was " + num,
+ num <= expOverloadMax);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
index 372dd3b..e5ab3ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.federation.router;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.simulateSlowNamenode;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.apache.hadoop.test.GenericTestUtils.waitFor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.List;
@@ -44,13 +44,8 @@ import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
-import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.test.Whitebox;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.After;
@@ -58,10 +53,6 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
@@ -70,9 +61,6 @@ import com.google.common.base.Supplier;
*/
public class TestRouterRPCClientRetries {
- private static final Logger LOG =
- LoggerFactory.getLogger(TestRouterRPCClientRetries.class);
-
private static StateStoreDFSCluster cluster;
private static NamenodeContext nnContext1;
private static RouterContext routerContext;
@@ -144,7 +132,7 @@ public class TestRouterRPCClientRetries {
fail("Should have thrown RemoteException error.");
} catch (RemoteException e) {
String ns0 = cluster.getNameservices().get(0);
- GenericTestUtils.assertExceptionContains(
+ assertExceptionContains(
"No namenode available under nameservice " + ns0, e);
}
@@ -212,14 +200,14 @@ public class TestRouterRPCClientRetries {
// Making subcluster0 slow to reply, should only get DNs from nn1
MiniDFSCluster dfsCluster = cluster.getCluster();
NameNode nn0 = dfsCluster.getNameNode(0);
- simulateNNSlow(nn0);
+ simulateSlowNamenode(nn0, 3);
waitUpdateLiveNodes(jsonString2, metrics);
final String jsonString3 = metrics.getLiveNodes();
assertEquals(2, getNumDatanodes(jsonString3));
// Making subcluster1 slow to reply, shouldn't get any DNs
NameNode nn1 = dfsCluster.getNameNode(1);
- simulateNNSlow(nn1);
+ simulateSlowNamenode(nn1, 3);
waitUpdateLiveNodes(jsonString3, metrics);
final String jsonString4 = metrics.getLiveNodes();
assertEquals(0, getNumDatanodes(jsonString4));
@@ -249,36 +237,11 @@ public class TestRouterRPCClientRetries {
private static void waitUpdateLiveNodes(
final String oldValue, final NamenodeBeanMetrics metrics)
throws Exception {
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return !oldValue.equals(metrics.getLiveNodes());
}
}, 500, 5 * 1000);
}
-
- /**
- * Simulate that a Namenode is slow by adding a sleep to the check operation
- * in the NN.
- * @param nn Namenode to simulate slow.
- * @throws Exception If we cannot add the sleep time.
- */
- private static void simulateNNSlow(final NameNode nn) throws Exception {
- FSNamesystem namesystem = nn.getNamesystem();
- HAContext haContext = namesystem.getHAContext();
- HAContext spyHAContext = spy(haContext);
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- LOG.info("Simulating slow namenode {}", invocation.getMock());
- try {
- Thread.sleep(3 * 1000);
- } catch(InterruptedException e) {
- LOG.error("Simulating a slow namenode aborted");
- }
- return null;
- }
- }).when(spyHAContext).checkOperation(any(OperationCategory.class));
- Whitebox.setInternalState(namesystem, "haContext", spyHAContext);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/37269261/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
index e5d8348..f16ceb5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterSafemode.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.Time;
import org.junit.After;
@@ -187,7 +188,7 @@ public class TestRouterSafemode {
try {
router.getRpcServer().delete("/testfile.txt", true);
fail("We should have thrown a safe mode exception");
- } catch (RouterSafeModeException sme) {
+ } catch (StandbyException sme) {
exception = true;
}
assertTrue("We should have thrown a safe mode exception", exception);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org