You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2018/02/20 01:37:31 UTC
hadoop git commit: HDFS-13119. RBF: Manage unavailable clusters.
Contributed by Yiqun Lin.
Repository: hadoop
Updated Branches:
refs/heads/trunk 1d37cf675 -> 8896d20b9
HDFS-13119. RBF: Manage unavailable clusters. Contributed by Yiqun Lin.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8896d20b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8896d20b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8896d20b
Branch: refs/heads/trunk
Commit: 8896d20b91520053a6bbfb680adb345cd24f4142
Parents: 1d37cf6
Author: Yiqun Lin <yq...@apache.org>
Authored: Tue Feb 20 09:37:08 2018 +0800
Committer: Yiqun Lin <yq...@apache.org>
Committed: Tue Feb 20 09:37:08 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 8 +
.../federation/metrics/FederationRPCMBean.java | 2 +
.../metrics/FederationRPCMetrics.java | 11 ++
.../FederationRPCPerformanceMonitor.java | 10 ++
.../resolver/NamenodeStatusReport.java | 8 +
.../federation/router/RouterRpcClient.java | 71 +++++++--
.../federation/router/RouterRpcMonitor.java | 13 ++
.../federation/router/RouterRpcServer.java | 9 ++
.../src/main/resources/hdfs-default.xml | 17 +++
.../router/TestRouterRPCClientRetries.java | 151 +++++++++++++++++++
10 files changed, 289 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0828957..bea38d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1246,6 +1246,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(10);
+ // HDFS Router RPC client
+ public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
+ FEDERATION_ROUTER_PREFIX + "client.thread-size";
+ public static final int DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT = 32;
+ public static final String DFS_ROUTER_CLIENT_MAX_ATTEMPTS =
+ FEDERATION_ROUTER_PREFIX + "client.retry.max.attempts";
+ public static final int DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT = 3;
+
// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
FEDERATION_ROUTER_PREFIX + "file.resolver.client.class";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
index 00209e9..3e031fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java
@@ -42,6 +42,8 @@ public interface FederationRPCMBean {
long getProxyOpNotImplemented();
+ long getProxyOpRetries();
+
long getRouterFailureStateStoreOps();
long getRouterFailureReadOnlyOps();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
index 8995689..94d3383 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java
@@ -56,6 +56,8 @@ public class FederationRPCMetrics implements FederationRPCMBean {
private MutableCounterLong proxyOpFailureCommunicate;
@Metric("Number of operations not implemented")
private MutableCounterLong proxyOpNotImplemented;
+ @Metric("Number of operation retries")
+ private MutableCounterLong proxyOpRetries;
@Metric("Failed requests due to State Store unavailable")
private MutableCounterLong routerFailureStateStore;
@@ -126,6 +128,15 @@ public class FederationRPCMetrics implements FederationRPCMBean {
return proxyOpNotImplemented.value();
}
+ public void incrProxyOpRetries() {
+ proxyOpRetries.incr();
+ }
+
+ @Override
+ public long getProxyOpRetries() {
+ return proxyOpRetries.value();
+ }
+
public void incrRouterFailureStateStore() {
routerFailureStateStore.incr();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
index e3a16b5..547ebb5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java
@@ -159,6 +159,11 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
}
@Override
+ public void proxyOpRetries() {
+ metrics.incrProxyOpRetries();
+ }
+
+ @Override
public void routerFailureStateStore() {
metrics.incrRouterFailureStateStore();
}
@@ -208,4 +213,9 @@ public class FederationRPCPerformanceMonitor implements RouterRpcMonitor {
}
return -1;
}
+
+ @Override
+ public FederationRPCMetrics getRPCMetrics() {
+ return this.metrics;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
index d3c6d87..c3c6fa8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/NamenodeStatusReport.java
@@ -390,6 +390,14 @@ public class NamenodeStatusReport {
return this.numOfBlocksPendingDeletion;
}
+ /**
+ * Set the validity of registration.
+ * @param isValid The desired value to be set.
+ */
+ public void setRegistrationValid(boolean isValid) {
+ this.registrationValid = isValid;
+ }
+
@Override
public String toString() {
return String.format("%s-%s:%s",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 4209a49..d3b7947 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -46,12 +46,14 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
@@ -123,10 +125,14 @@ public class RouterRpcClient {
this.connectionManager = new ConnectionManager(conf);
this.connectionManager.start();
+ int numThreads = conf.getInt(
+ DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE,
+ DFSConfigKeys.DFS_ROUTER_CLIENT_THREADS_SIZE_DEFAULT);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("RPC Router Client-%d")
.build();
- this.executorService = Executors.newCachedThreadPool(threadFactory);
+ this.executorService = Executors.newFixedThreadPool(
+ numThreads, threadFactory);
this.rpcMonitor = monitor;
@@ -134,8 +140,8 @@ public class RouterRpcClient {
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_KEY,
HdfsClientConfigKeys.Failover.MAX_ATTEMPTS_DEFAULT);
int maxRetryAttempts = conf.getInt(
- HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_KEY,
- HdfsClientConfigKeys.Retry.MAX_ATTEMPTS_DEFAULT);
+ DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS,
+ DFSConfigKeys.DFS_ROUTER_CLIENT_MAX_ATTEMPTS_DEFAULT);
int failoverSleepBaseMillis = conf.getInt(
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_KEY,
HdfsClientConfigKeys.Failover.SLEEPTIME_BASE_DEFAULT);
@@ -274,11 +280,24 @@ public class RouterRpcClient {
*
* @param ioe IOException reported.
* @param retryCount Number of retries.
+ * @param nsId Nameservice ID.
* @return Retry decision.
- * @throws IOException Original exception if the retry policy generates one.
+ * @throws IOException Original exception if the retry policy generates one
+ * or IOException for no available namenodes.
*/
- private RetryDecision shouldRetry(final IOException ioe, final int retryCount)
- throws IOException {
+ private RetryDecision shouldRetry(final IOException ioe, final int retryCount,
+ final String nsId) throws IOException {
+ // check for the case of cluster unavailable state
+ if (isClusterUnAvailable(nsId)) {
+ // we allow to retry once if cluster is unavailable
+ if (retryCount == 0) {
+ return RetryDecision.RETRY;
+ } else {
+ throw new IOException("No namenode available under nameservice " + nsId,
+ ioe);
+ }
+ }
+
try {
final RetryPolicy.RetryAction a =
this.retryPolicy.shouldRetry(ioe, retryCount, 0, true);
@@ -329,7 +348,7 @@ public class RouterRpcClient {
connection = this.getConnection(ugi, nsId, rpcAddress);
ProxyAndInfo<ClientProtocol> client = connection.getClient();
ClientProtocol proxy = client.getProxy();
- ret = invoke(0, method, proxy, params);
+ ret = invoke(nsId, 0, method, proxy, params);
if (failover) {
// Success on alternate server, update
InetSocketAddress address = client.getAddress();
@@ -400,6 +419,8 @@ public class RouterRpcClient {
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
+ * @param nsId Identifier for the namespace
+ * @param retryCount Current retry times
* @param method Method to invoke
* @param obj Target object for the method
* @param params Variable parameters
@@ -407,8 +428,8 @@ public class RouterRpcClient {
* @throws IOException
* @throws InterruptedException
*/
- private Object invoke(int retryCount, final Method method, final Object obj,
- final Object... params) throws IOException {
+ private Object invoke(String nsId, int retryCount, final Method method,
+ final Object obj, final Object... params) throws IOException {
try {
return method.invoke(obj, params);
} catch (IllegalAccessException e) {
@@ -421,11 +442,16 @@ public class RouterRpcClient {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
IOException ioe = (IOException) cause;
+
// Check if we should retry.
- RetryDecision decision = shouldRetry(ioe, retryCount);
+ RetryDecision decision = shouldRetry(ioe, retryCount, nsId);
if (decision == RetryDecision.RETRY) {
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpRetries();
+ }
+
// retry
- return invoke(++retryCount, method, obj, params);
+ return invoke(nsId, ++retryCount, method, obj, params);
} else if (decision == RetryDecision.FAILOVER_AND_RETRY) {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
@@ -448,6 +474,29 @@ public class RouterRpcClient {
}
/**
+ * Check if the cluster of given nameservice id is available.
+ * @param nsId nameservice ID.
+ * @return
+ * @throws IOException
+ */
+ private boolean isClusterUnAvailable(String nsId) throws IOException {
+ List<? extends FederationNamenodeContext> nnState = this.namenodeResolver
+ .getNamenodesForNameserviceId(nsId);
+
+ if (nnState != null) {
+ for (FederationNamenodeContext nnContext : nnState) {
+ // Once we find one NN is in active state, we assume this
+ // cluster is available.
+ if (nnContext.getState() == FederationNamenodeServiceState.ACTIVE) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ /**
* Get a clean copy of the exception. Sometimes the exceptions returned by the
* server contain the full stack trace in the message.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
index d889a56..df9aa11 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.federation.router;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
/**
@@ -36,6 +37,12 @@ public interface RouterRpcMonitor {
Configuration conf, RouterRpcServer server, StateStoreService store);
/**
+ * Get Router RPC metrics info.
+ * @return The instance of FederationRPCMetrics.
+ */
+ FederationRPCMetrics getRPCMetrics();
+
+ /**
* Close the monitor.
*/
void close();
@@ -74,6 +81,12 @@ public interface RouterRpcMonitor {
void proxyOpNotImplemented();
/**
+ * Retry to proxy an operation to a Namenode because of an unexpected
+ * exception.
+ */
+ void proxyOpRetries();
+
+ /**
* If the Router cannot contact the State Store in an operation.
*/
void routerFailureStateStore();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 57125ca..e0dfeb4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
@@ -2177,4 +2178,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol {
public Quota getQuotaModule() {
return this.quotaCall;
}
+
+ /**
+ * Get RPC metrics info.
+ * @return The instance of FederationRPCMetrics.
+ */
+ public FederationRPCMetrics getRPCMetrics() {
+ return this.rpcMonitor.getRPCMetrics();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index b61c418..d037b2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5234,4 +5234,21 @@
is assumed.
</description>
</property>
+
+ <property>
+ <name>dfs.federation.router.client.thread-size</name>
+ <value>32</value>
+ <description>
+ Max threads size for the RouterClient to execute concurrent
+ requests.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.client.retry.max.attempts</name>
+ <value>3</value>
+ <description>
+ Max retry attempts for the RouterClient talking to the Router.
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8896d20b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
new file mode 100644
index 0000000..dddcb5a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test retry behavior of the Router RPC Client.
+ */
+public class TestRouterRPCClientRetries {
+
+ private static StateStoreDFSCluster cluster;
+ private static NamenodeContext nnContext1;
+ private static RouterContext routerContext;
+ private static MembershipNamenodeResolver resolver;
+ private static ClientProtocol routerProtocol;
+
+ @Before
+ public void setUp() throws Exception {
+ // Build and start a federated cluster
+ cluster = new StateStoreDFSCluster(false, 2);
+ Configuration routerConf = new RouterConfigBuilder()
+ .stateStore()
+ .admin()
+ .rpc()
+ .build();
+
+ cluster.addRouterOverrides(routerConf);
+ cluster.startCluster();
+ cluster.startRouters();
+ cluster.waitClusterUp();
+
+ nnContext1 = cluster.getNamenode(cluster.getNameservices().get(0), null);
+ routerContext = cluster.getRandomRouter();
+ resolver = (MembershipNamenodeResolver) routerContext.getRouter()
+ .getNamenodeResolver();
+ routerProtocol = routerContext.getClient().getNamenode();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.stopRouter(routerContext);
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test
+ public void testRetryWhenAllNameServiceDown() throws Exception {
+ // shutdown the dfs cluster
+ MiniDFSCluster dfsCluster = cluster.getCluster();
+ dfsCluster.shutdown();
+
+ // register an invalid namenode report
+ registerInvalidNameReport();
+
+ // Create a directory via the router
+ String dirPath = "/testRetryWhenClusterisDown";
+ FsPermission permission = new FsPermission("705");
+ try {
+ routerProtocol.mkdirs(dirPath, permission, false);
+ fail("Should have thrown RemoteException error.");
+ } catch (RemoteException e) {
+ String ns0 = cluster.getNameservices().get(0);
+ GenericTestUtils.assertExceptionContains(
+ "No namenode available under nameservice " + ns0, e);
+ }
+
+ // Verify the retry times, it should only retry one time.
+ FederationRPCMetrics rpcMetrics = routerContext.getRouter()
+ .getRpcServer().getRPCMetrics();
+ assertEquals(1, rpcMetrics.getProxyOpRetries());
+ }
+
+ @Test
+ public void testRetryWhenOneNameServiceDown() throws Exception {
+ // shutdown the dfs cluster
+ MiniDFSCluster dfsCluster = cluster.getCluster();
+ dfsCluster.shutdownNameNode(0);
+
+ // register an invalid namenode report
+ registerInvalidNameReport();
+
+ DFSClient client = nnContext1.getClient();
+ // Renew lease for the DFS client, it will succeed.
+ routerProtocol.renewLease(client.getClientName());
+
+ // Verify the retry times, it will retry one time for ns0.
+ FederationRPCMetrics rpcMetrics = routerContext.getRouter()
+ .getRpcServer().getRPCMetrics();
+ assertEquals(1, rpcMetrics.getProxyOpRetries());
+ }
+
+ /**
+ * Register an invalid namenode report.
+ * @throws IOException
+ */
+ private void registerInvalidNameReport() throws IOException {
+ String ns0 = cluster.getNameservices().get(0);
+ List<? extends FederationNamenodeContext> origin = resolver
+ .getNamenodesForNameserviceId(ns0);
+ FederationNamenodeContext nnInfo = origin.get(0);
+ NamenodeStatusReport report = new NamenodeStatusReport(ns0,
+ nnInfo.getNamenodeId(), nnInfo.getRpcAddress(),
+ nnInfo.getServiceAddress(), nnInfo.getLifelineAddress(),
+ nnInfo.getWebAddress());
+ report.setRegistrationValid(false);
+ assertTrue(resolver.registerNamenode(report));
+ resolver.loadCache(true);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org