You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2019/03/30 01:52:17 UTC
[hadoop] branch HDFS-13891 updated: HDFS-14316. RBF: Support
unavailable subclusters for mount points with multiple destinations.
Contributed by Inigo Goiri.
This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-13891 by this push:
new dea3798 HDFS-14316. RBF: Support unavailable subclusters for mount points with multiple destinations. Contributed by Inigo Goiri.
dea3798 is described below
commit dea3798e174063b641c7a8dc023d2b5ceba090b2
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sat Mar 30 07:15:41 2019 +0530
HDFS-14316. RBF: Support unavailable subclusters for mount points with multiple destinations. Contributed by Inigo Goiri.
---
.../federation/metrics/FederationMetrics.java | 1 +
.../resolver/order/DestinationOrder.java | 10 +-
.../server/federation/router/RBFConfigKeys.java | 12 +-
.../server/federation/router/RemoteMethod.java | 10 +-
.../hdfs/server/federation/router/RemoteParam.java | 9 +
.../federation/router/RouterClientProtocol.java | 133 ++++-
.../server/federation/router/RouterRpcClient.java | 79 ++-
.../server/federation/router/RouterRpcServer.java | 48 +-
.../federation/store/impl/MountTableStoreImpl.java | 2 +
.../federation/store/records/MountTable.java | 56 +-
.../store/records/impl/pb/MountTablePBImpl.java | 14 +
.../hadoop/hdfs/tools/federation/RouterAdmin.java | 31 +-
.../src/main/proto/FederationProtocol.proto | 2 +
.../src/main/resources/hdfs-rbf-default.xml | 30 +
.../src/main/webapps/router/federationhealth.html | 2 +
.../src/main/webapps/router/federationhealth.js | 12 +
.../src/main/webapps/static/rbf.css | 5 +
.../src/site/markdown/HDFSRouterFederation.md | 8 +
.../hdfs/server/federation/MockNamenode.java | 229 +++++++-
.../federation/router/TestRouterAdminCLI.java | 65 +-
.../federation/router/TestRouterFaultTolerant.java | 654 +++++++++++++++++++++
.../router/TestRouterNamenodeMonitoring.java | 2 +-
.../store/FederationStateStoreTestUtils.java | 6 +-
.../federation/store/TestStateStoreMountTable.java | 6 +-
.../federation/store/records/TestMountTable.java | 19 +
.../hadoop-hdfs/src/site/markdown/HDFSCommands.md | 4 +-
26 files changed, 1371 insertions(+), 78 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
index 5ab978d..a39f17d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
@@ -270,6 +270,7 @@ public class FederationMetrics implements FederationMBean {
innerInfo.put("order", "");
}
innerInfo.put("readonly", entry.isReadOnly());
+ innerInfo.put("faulttolerant", entry.isFaultTolerant());
info.add(Collections.unmodifiableMap(innerInfo));
}
} catch (IOException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
index 99c5e22..6a637d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation.resolver.order;
+import java.util.EnumSet;
+
/**
* Order of the destinations when we have multiple of them. When the resolver
* of files to subclusters (FileSubclusterResolver) has multiple destinations,
@@ -27,5 +29,11 @@ public enum DestinationOrder {
LOCAL, // Local first
RANDOM, // Random order
HASH_ALL, // Follow consistent hashing
- SPACE // Available space based order
+ SPACE; // Available space based order
+
+ /** Approaches that write folders in all subclusters. */
+ public static final EnumSet<DestinationOrder> FOLDER_ALL = EnumSet.of(
+ HASH_ALL,
+ RANDOM,
+ SPACE);
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 657b6cf..153cd64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -135,7 +135,17 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_ALLOW_PARTIAL_LIST =
FEDERATION_ROUTER_PREFIX + "client.allow-partial-listing";
public static final boolean DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT = true;
-
+ public static final String DFS_ROUTER_CLIENT_MOUNT_TIME_OUT =
+ FEDERATION_ROUTER_PREFIX + "client.mount-status.time-out";
+ public static final long DFS_ROUTER_CLIENT_MOUNT_TIME_OUT_DEFAULT =
+ TimeUnit.SECONDS.toMillis(1);
+ public static final String DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT =
+ FEDERATION_ROUTER_PREFIX + "connect.max.retries.on.timeouts";
+ public static final int DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT = 0;
+ public static final String DFS_ROUTER_CLIENT_CONNECT_TIMEOUT =
+ FEDERATION_ROUTER_PREFIX + "connect.timeout";
+ public static final long DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT =
+ TimeUnit.SECONDS.toMillis(2);
// HDFS Router State Store connection
public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
index 6ff2b01..f7ba812 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
@@ -210,7 +210,13 @@ public class RemoteMethod {
@Override
public String toString() {
- return this.protocol.getSimpleName() + "#" + this.methodName + " " +
- Arrays.toString(this.params);
+ return new StringBuilder()
+ .append(this.protocol.getSimpleName())
+ .append("#")
+ .append(this.methodName)
+ .append("(")
+ .append(Arrays.deepToString(this.params))
+ .append(")")
+ .toString();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
index 8816ff6..8b216d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
@@ -68,4 +68,13 @@ public class RemoteParam {
return context.getDest();
}
}
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("RemoteParam(")
+ .append(this.paramMap)
+ .append(")")
+ .toString();
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index d1b2269..50d9267 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
@@ -93,6 +94,8 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@@ -103,6 +106,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
/**
* Module that implements all the RPC calls in {@link ClientProtocol} in the
@@ -119,6 +123,8 @@ public class RouterClientProtocol implements ClientProtocol {
/** If it requires response from all subclusters. */
private final boolean allowPartialList;
+ /** Time out when getting the mount statistics. */
+ private long mountStatusTimeOut;
/** Identifier for the super user. */
private final String superUser;
@@ -140,6 +146,10 @@ public class RouterClientProtocol implements ClientProtocol {
this.allowPartialList = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST,
RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT);
+ this.mountStatusTimeOut = conf.getTimeDuration(
+ RBFConfigKeys.DFS_ROUTER_CLIENT_MOUNT_TIME_OUT,
+ RBFConfigKeys.DFS_ROUTER_CLIENT_MOUNT_TIME_OUT_DEFAULT,
+ TimeUnit.SECONDS);
// User and group for reporting
this.superUser = System.getProperty("user.name");
@@ -228,15 +238,92 @@ public class RouterClientProtocol implements ClientProtocol {
}
}
- RemoteLocation createLocation = rpcServer.getCreateLocation(src);
RemoteMethod method = new RemoteMethod("create",
new Class<?>[] {String.class, FsPermission.class, String.class,
EnumSetWritable.class, boolean.class, short.class,
long.class, CryptoProtocolVersion[].class,
String.class, String.class},
- createLocation.getDest(), masked, clientName, flag, createParent,
+ new RemoteParam(), masked, clientName, flag, createParent,
replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
- return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteLocation createLocation = null;
+ try {
+ createLocation = rpcServer.getCreateLocation(src);
+ return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
+ } catch (IOException ioe) {
+ final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+ method, src, ioe, createLocation, locations);
+ return rpcClient.invokeSequential(
+ newLocations, method, HdfsFileStatus.class, null);
+ }
+ }
+
+ /**
+ * Check if an exception is caused by an unavailable subcluster or not. It
+ * also checks the causes.
+ * @param ioe IOException to check.
+ * @return If caused by an unavailable subcluster. False if the should not be
+ * retried (e.g., NSQuotaExceededException).
+ */
+ private static boolean isUnavailableSubclusterException(
+ final IOException ioe) {
+ if (ioe instanceof ConnectException ||
+ ioe instanceof ConnectTimeoutException ||
+ ioe instanceof NoNamenodesAvailableException) {
+ return true;
+ }
+ if (ioe.getCause() instanceof IOException) {
+ IOException cause = (IOException)ioe.getCause();
+ return isUnavailableSubclusterException(cause);
+ }
+ return false;
+ }
+
+ /**
+ * Check if a remote method can be retried in other subclusters when it
+ * failed in the original destination. This method returns the list of
+ * locations to retry in. This is used by fault tolerant mount points.
+ * @param method Method that failed and might be retried.
+ * @param src Path where the method was invoked.
+ * @param e Exception that was triggered.
+ * @param excludeLoc Location that failed and should be excluded.
+ * @param locations All the locations to retry.
+ * @return The locations where we should retry (excluding the failed ones).
+ * @throws IOException If this path is not fault tolerant or the exception
+ * should not be retried (e.g., NSQuotaExceededException).
+ */
+ private List<RemoteLocation> checkFaultTolerantRetry(
+ final RemoteMethod method, final String src, final IOException ioe,
+ final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
+ throws IOException {
+
+ if (!isUnavailableSubclusterException(ioe)) {
+ LOG.debug("{} exception cannot be retried",
+ ioe.getClass().getSimpleName());
+ throw ioe;
+ }
+ if (!rpcServer.isPathFaultTolerant(src)) {
+ LOG.debug("{} does not allow retrying a failed subcluster", src);
+ throw ioe;
+ }
+
+ final List<RemoteLocation> newLocations;
+ if (excludeLoc == null) {
+ LOG.error("Cannot invoke {} for {}: {}", method, src, ioe.getMessage());
+ newLocations = locations;
+ } else {
+ LOG.error("Cannot invoke {} for {} in {}: {}",
+ method, src, excludeLoc, ioe.getMessage());
+ newLocations = new ArrayList<>();
+ for (final RemoteLocation loc : locations) {
+ if (!loc.equals(excludeLoc)) {
+ newLocations.add(loc);
+ }
+ }
+ }
+ LOG.info("{} allows retrying failed subclusters in {}", src, newLocations);
+ return newLocations;
}
@Override
@@ -598,13 +685,20 @@ public class RouterClientProtocol implements ClientProtocol {
}
} catch (IOException ioe) {
// Can't query if this file exists or not.
- LOG.error("Error requesting file info for path {} while proxing mkdirs",
- src, ioe);
+ LOG.error("Error getting file info for {} while proxying mkdirs: {}",
+ src, ioe.getMessage());
}
}
- RemoteLocation firstLocation = locations.get(0);
- return (boolean) rpcClient.invokeSingle(firstLocation, method);
+ final RemoteLocation firstLocation = locations.get(0);
+ try {
+ return (boolean) rpcClient.invokeSingle(firstLocation, method);
+ } catch (IOException ioe) {
+ final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+ method, src, ioe, firstLocation, locations);
+ return rpcClient.invokeSequential(
+ newLocations, method, Boolean.class, Boolean.TRUE);
+ }
}
@Override
@@ -1696,10 +1790,26 @@ public class RouterClientProtocol implements ClientProtocol {
*/
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method) throws IOException {
+ return getFileInfoAll(locations, method, -1);
+ }
+
+ /**
+ * Get the file info from all the locations.
+ *
+ * @param locations Locations to check.
+ * @param method The file information method to run.
+ * @param timeOutMs Time out for the operation in milliseconds.
+ * @return The first file info if it's a file, the directory if it's
+ * everywhere.
+ * @throws IOException If all the locations throw an exception.
+ */
+ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+ final RemoteMethod method, long timeOutMs) throws IOException {
// Get the file info from everybody
Map<RemoteLocation, HdfsFileStatus> results =
- rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
+ rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
+ HdfsFileStatus.class);
int children = 0;
// We return the first file
HdfsFileStatus dirStatus = null;
@@ -1756,9 +1866,10 @@ public class RouterClientProtocol implements ClientProtocol {
MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
MountTable entry = mountTable.getMountPoint(mName);
if (entry != null) {
- HdfsFileStatus fInfo = getFileInfoAll(entry.getDestinations(),
- new RemoteMethod("getFileInfo", new Class<?>[] {String.class},
- new RemoteParam()));
+ RemoteMethod method = new RemoteMethod("getFileInfo",
+ new Class<?>[] {String.class}, new RemoteParam());
+ HdfsFileStatus fInfo = getFileInfoAll(
+ entry.getDestinations(), method, mountStatusTimeOut);
if (fInfo != null) {
permission = fInfo.getPermission();
owner = fInfo.getOwner();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 3d80c41..730952b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.hdfs.server.federation.router;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
@@ -62,6 +66,7 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,7 +131,8 @@ public class RouterRpcClient {
this.namenodeResolver = resolver;
- this.connectionManager = new ConnectionManager(conf);
+ Configuration clientConf = getClientConfiguration(conf);
+ this.connectionManager = new ConnectionManager(clientConf);
this.connectionManager.start();
int numThreads = conf.getInt(
@@ -166,6 +172,31 @@ public class RouterRpcClient {
}
/**
+ * Get the configuration for the RPC client. It takes the Router
+ * configuration and transforms it into regular RPC Client configuration.
+ * @param conf Input configuration.
+ * @return Configuration for the RPC client.
+ */
+ private Configuration getClientConfiguration(final Configuration conf) {
+ Configuration clientConf = new Configuration(conf);
+ int maxRetries = conf.getInt(
+ RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT,
+ RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT);
+ if (maxRetries >= 0) {
+ clientConf.setInt(
+ IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetries);
+ }
+ long connectTimeOut = conf.getTimeDuration(
+ RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT,
+ RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ if (connectTimeOut >= 0) {
+ clientConf.setLong(IPC_CLIENT_CONNECT_TIMEOUT_KEY, connectTimeOut);
+ }
+ return clientConf;
+ }
+
+ /**
* Get the active namenode resolver used by this client.
* @return Active namenode resolver.
*/
@@ -341,17 +372,19 @@ public class RouterRpcClient {
* @param method Remote ClientProtcol method to invoke.
* @param params Variable list of parameters matching the method.
* @return The result of invoking the method.
- * @throws IOException
+ * @throws ConnectException If it cannot connect to any Namenode.
+ * @throws StandbyException If all Namenodes are in Standby.
+ * @throws IOException If it cannot invoke the method.
*/
private Object invokeMethod(
final UserGroupInformation ugi,
final List<? extends FederationNamenodeContext> namenodes,
final Class<?> protocol, final Method method, final Object... params)
- throws IOException {
+ throws ConnectException, StandbyException, IOException {
if (namenodes == null || namenodes.isEmpty()) {
throw new IOException("No namenodes to invoke " + method.getName() +
- " with params " + Arrays.toString(params) + " from "
+ " with params " + Arrays.deepToString(params) + " from "
+ router.getRouterId());
}
@@ -388,6 +421,12 @@ public class RouterRpcClient {
this.rpcMonitor.proxyOpFailureStandby();
}
failover = true;
+ } else if (ioe instanceof ConnectException ||
+ ioe instanceof ConnectTimeoutException) {
+ if (this.rpcMonitor != null) {
+ this.rpcMonitor.proxyOpFailureCommunicate();
+ }
+ failover = true;
} else if (ioe instanceof RemoteException) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpComplete(true);
@@ -408,7 +447,7 @@ public class RouterRpcClient {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpNoNamenodes();
}
- LOG.error("Can not get available namenode for {} {} error: {}",
+ LOG.error("Cannot get available namenode for {} {} error: {}",
nsId, rpcAddress, ioe.getMessage());
// Throw RetriableException so that client can retry
throw new RetriableException(ioe);
@@ -433,24 +472,33 @@ public class RouterRpcClient {
// All namenodes were unavailable or in standby
String msg = "No namenode available to invoke " + method.getName() + " " +
- Arrays.toString(params);
+ Arrays.deepToString(params) + " in " + namenodes + " from " +
+ router.getRouterId();
LOG.error(msg);
+ int exConnect = 0;
for (Entry<FederationNamenodeContext, IOException> entry :
ioes.entrySet()) {
FederationNamenodeContext namenode = entry.getKey();
- String nsId = namenode.getNameserviceId();
- String nnId = namenode.getNamenodeId();
+ String nnKey = namenode.getNamenodeKey();
String addr = namenode.getRpcAddress();
IOException ioe = entry.getValue();
if (ioe instanceof StandbyException) {
- LOG.error("{} {} at {} is in Standby: {}", nsId, nnId, addr,
- ioe.getMessage());
+ LOG.error("{} at {} is in Standby: {}",
+ nnKey, addr, ioe.getMessage());
+ } else if (ioe instanceof ConnectException ||
+ ioe instanceof ConnectTimeoutException) {
+ exConnect++;
+ LOG.error("{} at {} cannot be reached: {}",
+ nnKey, addr, ioe.getMessage());
} else {
- LOG.error("{} {} at {} error: \"{}\"",
- nsId, nnId, addr, ioe.getMessage());
+ LOG.error("{} at {} error: \"{}\"", nnKey, addr, ioe.getMessage());
}
}
- throw new StandbyException(msg);
+ if (exConnect == ioes.size()) {
+ throw new ConnectException(msg);
+ } else {
+ throw new StandbyException(msg);
+ }
}
/**
@@ -497,6 +545,9 @@ public class RouterRpcClient {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
throw ioe;
+ } else if (ioe instanceof ConnectException ||
+ ioe instanceof ConnectTimeoutException) {
+ throw ioe;
} else {
throw new StandbyException(ioe.getMessage());
}
@@ -1043,7 +1094,7 @@ public class RouterRpcClient {
if (locations.isEmpty()) {
throw new IOException("No remote locations available");
- } else if (locations.size() == 1) {
+ } else if (locations.size() == 1 && timeOutMs <= 0) {
// Shortcut, just one call
T location = locations.iterator().next();
String ns = location.getNameserviceId();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 739a2ff..b934355 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -30,6 +30,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Array;
+import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
@@ -133,6 +134,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.AccessControlException;
@@ -294,7 +296,9 @@ public class RouterRpcServer extends AbstractService
AccessControlException.class,
LeaseExpiredException.class,
NotReplicatedYetException.class,
- IOException.class);
+ IOException.class,
+ ConnectException.class,
+ RetriableException.class);
this.rpcServer.addSuppressedLoggingExceptions(
StandbyException.class);
@@ -520,7 +524,7 @@ public class RouterRpcServer extends AbstractService
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
if (nss.isEmpty()) {
- throw new IOException("No namespace availaible.");
+ throw new IOException("No namespace available.");
}
nsId = nss.iterator().next().getNameserviceId();
return rpcClient.invokeSingle(nsId, method, clazz);
@@ -566,6 +570,7 @@ public class RouterRpcServer extends AbstractService
replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
}
+
/**
* Get the location to create a file. It checks if the file already existed
* in one of the locations.
@@ -574,10 +579,24 @@ public class RouterRpcServer extends AbstractService
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
- RemoteLocation getCreateLocation(final String src)
+ RemoteLocation getCreateLocation(final String src) throws IOException {
+ final List<RemoteLocation> locations = getLocationsForPath(src, true);
+ return getCreateLocation(src, locations);
+ }
+
+ /**
+ * Get the location to create a file. It checks if the file already existed
+ * in one of the locations.
+ *
+ * @param src Path of the file to check.
+ * @param locations Prefetched locations for the file.
+ * @return The remote location for this file.
+ * @throws IOException If the file has no creation location.
+ */
+ RemoteLocation getCreateLocation(
+ final String src, final List<RemoteLocation> locations)
throws IOException {
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
if (locations == null || locations.isEmpty()) {
throw new IOException("Cannot get locations to create " + src);
}
@@ -1569,6 +1588,27 @@ public class RouterRpcServer extends AbstractService
}
/**
+ * Check if a path supports failed subclusters.
+ *
+ * @param path Path to check.
+ * @return If a path should support failed subclusters.
+ */
+ boolean isPathFaultTolerant(final String path) {
+ if (subclusterResolver instanceof MountTableResolver) {
+ try {
+ MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
+ MountTable entry = mountTable.getMountPoint(path);
+ if (entry != null) {
+ return entry.isFaultTolerant();
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot get mount point", e);
+ }
+ }
+ return false;
+ }
+
+ /**
* Check if call needs to be invoked to all the locations. The call is
* supposed to be invoked in all the locations in case the order of the mount
* entry is amongst HASH_ALL, RANDOM or SPACE or if the source is itself a
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
index d5e1857..8761038 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
@@ -66,6 +66,7 @@ public class MountTableStoreImpl extends MountTableStore {
if (pc != null) {
pc.checkPermission(mountTable, FsAction.WRITE);
}
+ mountTable.validate();
}
boolean status = getDriver().put(mountTable, false, true);
@@ -85,6 +86,7 @@ public class MountTableStoreImpl extends MountTableStore {
if (pc != null) {
pc.checkPermission(mountTable, FsAction.WRITE);
}
+ mountTable.validate();
}
boolean status = getDriver().put(mountTable, true, true);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
index c1585b0..d1351a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
+import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -59,6 +60,10 @@ public abstract class MountTable extends BaseRecord {
"Invalid entry, invalid destination path ";
public static final String ERROR_MSG_ALL_DEST_MUST_START_WITH_BACK_SLASH =
"Invalid entry, all destination must start with / ";
+ private static final String ERROR_MSG_FAULT_TOLERANT_MULTI_DEST =
+ "Invalid entry, fault tolerance requires multiple destinations ";
+ private static final String ERROR_MSG_FAULT_TOLERANT_ALL =
+ "Invalid entry, fault tolerance only supported for ALL order ";
/** Comparator for paths which considers the /. */
public static final Comparator<String> PATH_COMPARATOR =
@@ -229,6 +234,20 @@ public abstract class MountTable extends BaseRecord {
public abstract void setDestOrder(DestinationOrder order);
/**
+ * Check if the mount point supports a failed destination.
+ *
+ * @return If it supports failures.
+ */
+ public abstract boolean isFaultTolerant();
+
+ /**
+ * Set if the mount point supports failed destinations.
+ *
+ * @param faultTolerant If it supports failures.
+ */
+ public abstract void setFaultTolerant(boolean faultTolerant);
+
+ /**
* Get owner name of this mount table entry.
*
* @return Owner name
@@ -321,11 +340,14 @@ public abstract class MountTable extends BaseRecord {
List<RemoteLocation> destinations = this.getDestinations();
sb.append(destinations);
if (destinations != null && destinations.size() > 1) {
- sb.append("[" + this.getDestOrder() + "]");
+ sb.append("[").append(this.getDestOrder()).append("]");
}
if (this.isReadOnly()) {
sb.append("[RO]");
}
+ if (this.isFaultTolerant()) {
+ sb.append("[FT]");
+ }
if (this.getOwnerName() != null) {
sb.append("[owner:").append(this.getOwnerName()).append("]");
@@ -383,6 +405,16 @@ public abstract class MountTable extends BaseRecord {
ERROR_MSG_ALL_DEST_MUST_START_WITH_BACK_SLASH + this);
}
}
+ if (isFaultTolerant()) {
+ if (getDestinations().size() < 2) {
+ throw new IllegalArgumentException(
+ ERROR_MSG_FAULT_TOLERANT_MULTI_DEST + this);
+ }
+ if (!isAll()) {
+ throw new IllegalArgumentException(
+ ERROR_MSG_FAULT_TOLERANT_ALL + this);
+ }
+ }
}
@Override
@@ -397,6 +429,7 @@ public abstract class MountTable extends BaseRecord {
.append(this.getDestinations())
.append(this.isReadOnly())
.append(this.getDestOrder())
+ .append(this.isFaultTolerant())
.toHashCode();
}
@@ -404,16 +437,13 @@ public abstract class MountTable extends BaseRecord {
public boolean equals(Object obj) {
if (obj instanceof MountTable) {
MountTable other = (MountTable)obj;
- if (!this.getSourcePath().equals(other.getSourcePath())) {
- return false;
- } else if (!this.getDestinations().equals(other.getDestinations())) {
- return false;
- } else if (this.isReadOnly() != other.isReadOnly()) {
- return false;
- } else if (!this.getDestOrder().equals(other.getDestOrder())) {
- return false;
- }
- return true;
+ return new EqualsBuilder()
+ .append(this.getSourcePath(), other.getSourcePath())
+ .append(this.getDestinations(), other.getDestinations())
+ .append(this.isReadOnly(), other.isReadOnly())
+ .append(this.getDestOrder(), other.getDestOrder())
+ .append(this.isFaultTolerant(), other.isFaultTolerant())
+ .isEquals();
}
return false;
}
@@ -424,9 +454,7 @@ public abstract class MountTable extends BaseRecord {
*/
public boolean isAll() {
DestinationOrder order = getDestOrder();
- return order == DestinationOrder.HASH_ALL ||
- order == DestinationOrder.RANDOM ||
- order == DestinationOrder.SPACE;
+ return DestinationOrder.FOLDER_ALL.contains(order);
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
index 4c7622c..62cdc72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
@@ -196,6 +196,20 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
}
@Override
+ public boolean isFaultTolerant() {
+ MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
+ if (!proto.hasFaultTolerant()) {
+ return false;
+ }
+ return proto.getFaultTolerant();
+ }
+
+ @Override
+ public void setFaultTolerant(boolean faultTolerant) {
+ this.translator.getBuilder().setFaultTolerant(faultTolerant);
+ }
+
+ @Override
public String getOwnerName() {
MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
if (!proto.hasOwnerName()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
index b04b069..61da7e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
@@ -135,12 +135,12 @@ public class RouterAdmin extends Configured implements Tool {
}
if (cmd.equals("-add")) {
return "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
- + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]";
} else if (cmd.equals("-update")) {
return "\t[-update <source> <nameservice1, nameservice2, ...> "
+ "<destination> "
- + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]";
} else if (cmd.equals("-rm")) {
return "\t[-rm <source>]";
@@ -415,6 +415,7 @@ public class RouterAdmin extends Configured implements Tool {
// Optional parameters
boolean readOnly = false;
+ boolean faultTolerant = false;
String owner = null;
String group = null;
FsPermission mode = null;
@@ -422,6 +423,8 @@ public class RouterAdmin extends Configured implements Tool {
while (i < parameters.length) {
if (parameters[i].equals("-readonly")) {
readOnly = true;
+ } else if (parameters[i].equals("-faulttolerant")) {
+ faultTolerant = true;
} else if (parameters[i].equals("-order")) {
i++;
try {
@@ -447,7 +450,7 @@ public class RouterAdmin extends Configured implements Tool {
i++;
}
- return addMount(mount, nss, dest, readOnly, order,
+ return addMount(mount, nss, dest, readOnly, faultTolerant, order,
new ACLEntity(owner, group, mode));
}
@@ -464,7 +467,8 @@ public class RouterAdmin extends Configured implements Tool {
* @throws IOException Error adding the mount point.
*/
public boolean addMount(String mount, String[] nss, String dest,
- boolean readonly, DestinationOrder order, ACLEntity aclInfo)
+ boolean readonly, boolean faultTolerant, DestinationOrder order,
+ ACLEntity aclInfo)
throws IOException {
mount = normalizeFileSystemPath(mount);
// Get the existing entry
@@ -491,6 +495,9 @@ public class RouterAdmin extends Configured implements Tool {
if (readonly) {
newEntry.setReadOnly(true);
}
+ if (faultTolerant) {
+ newEntry.setFaultTolerant(true);
+ }
if (order != null) {
newEntry.setDestOrder(order);
}
@@ -508,6 +515,8 @@ public class RouterAdmin extends Configured implements Tool {
newEntry.setMode(aclInfo.getMode());
}
+ newEntry.validate();
+
AddMountTableEntryRequest request =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
@@ -527,6 +536,9 @@ public class RouterAdmin extends Configured implements Tool {
if (readonly) {
existingEntry.setReadOnly(true);
}
+ if (faultTolerant) {
+ existingEntry.setFaultTolerant(true);
+ }
if (order != null) {
existingEntry.setDestOrder(order);
}
@@ -544,6 +556,8 @@ public class RouterAdmin extends Configured implements Tool {
existingEntry.setMode(aclInfo.getMode());
}
+ existingEntry.validate();
+
UpdateMountTableEntryRequest updateRequest =
UpdateMountTableEntryRequest.newInstance(existingEntry);
UpdateMountTableEntryResponse updateResponse =
@@ -572,6 +586,7 @@ public class RouterAdmin extends Configured implements Tool {
// Optional parameters
boolean readOnly = false;
+ boolean faultTolerant = false;
String owner = null;
String group = null;
FsPermission mode = null;
@@ -579,6 +594,8 @@ public class RouterAdmin extends Configured implements Tool {
while (i < parameters.length) {
if (parameters[i].equals("-readonly")) {
readOnly = true;
+ } else if (parameters[i].equals("-faulttolerant")) {
+ faultTolerant = true;
} else if (parameters[i].equals("-order")) {
i++;
try {
@@ -604,7 +621,7 @@ public class RouterAdmin extends Configured implements Tool {
i++;
}
- return updateMount(mount, nss, dest, readOnly, order,
+ return updateMount(mount, nss, dest, readOnly, faultTolerant, order,
new ACLEntity(owner, group, mode));
}
@@ -621,7 +638,8 @@ public class RouterAdmin extends Configured implements Tool {
* @throws IOException Error updating the mount point.
*/
public boolean updateMount(String mount, String[] nss, String dest,
- boolean readonly, DestinationOrder order, ACLEntity aclInfo)
+ boolean readonly, boolean faultTolerant,
+ DestinationOrder order, ACLEntity aclInfo)
throws IOException {
mount = normalizeFileSystemPath(mount);
MountTableManager mountTable = client.getMountTableManager();
@@ -634,6 +652,7 @@ public class RouterAdmin extends Configured implements Tool {
MountTable newEntry = MountTable.newInstance(mount, destMap);
newEntry.setReadOnly(readonly);
+ newEntry.setFaultTolerant(faultTolerant);
if (order != null) {
newEntry.setDestOrder(order);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
index a55be73..6a60e4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
@@ -143,6 +143,8 @@ message MountTableRecordProto {
optional int32 mode = 12;
optional QuotaUsageProto quota = 13;
+
+ optional bool faultTolerant = 14 [default = false];
}
message AddMountTableEntryRequestProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 1034c87..e23f863 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -504,6 +504,36 @@
</property>
<property>
+ <name>dfs.federation.router.client.mount-status.time-out</name>
+ <value>1s</value>
+ <description>
+ Set a timeout for the Router when listing folders containing mount
+ points. In this process, the Router checks the mount table and then it
+ checks permissions in the subcluster. After the time out, we return the
+ default values.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.connect.max.retries.on.timeouts</name>
+ <value>0</value>
+ <description>
+ Maximum number of retries for the IPC Client when connecting to the
+ subclusters. By default, it doesn't let the IPC retry and the Router
+ handles it.
+ </description>
+ </property>
+
+ <property>
+ <name>dfs.federation.router.connect.timeout</name>
+ <value>2s</value>
+ <description>
+ Time out for the IPC client connecting to the subclusters. This should be
+ short as the Router has knowledge of the state of the Routers.
+ </description>
+ </property>
+
+ <property>
<name>dfs.federation.router.keytab.file</name>
<value></value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html
index c591698..cf8653b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html
@@ -393,6 +393,7 @@
<th>Target path</th>
<th>Order</th>
<th>Read only</th>
+ <th>Fault tolerant</th>
<th>Owner</th>
<th>Group</th>
<th>Permission</th>
@@ -409,6 +410,7 @@
<td>{path}</td>
<td>{order}</td>
<td align="center" class="mount-table-icon mount-table-read-only-{readonly}" title="{status}"/>
+ <td align="center" class="mount-table-icon mount-table-fault-tolerant-{faulttolerant}" title="{ftStatus}"></td>
<td>{ownerName}</td>
<td>{groupName}</td>
<td>{mode}</td>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js
index 5da7b07..e655e60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js
@@ -324,8 +324,20 @@
}
}
+ function augment_fault_tolerant(mountTable) {
+ for (var i = 0, e = mountTable.length; i < e; ++i) {
+ if (mountTable[i].faulttolerant == true) {
+ mountTable[i].faulttolerant = "true"
+ mountTable[i].ftStatus = "Fault tolerant"
+ } else {
+ mountTable[i].faulttolerant = "false"
+ }
+ }
+ }
+
resource.MountTable = JSON.parse(resource.MountTable)
augment_read_only(resource.MountTable)
+ augment_fault_tolerant(resource.MountTable)
return resource;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css
index 5cdd826..b2eef6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css
@@ -135,3 +135,8 @@
color: #5fa341;
content: "\e033";
}
+
+.mount-table-fault-tolerant-true:before {
+ color: #5fa341;
+ content: "\e033";
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index f24ff12..83cecda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -266,6 +266,14 @@ To determine which subcluster contains a file:
[hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -getDestination /user/user1/file.txt
Note that consistency of the data across subclusters is not guaranteed by the Router.
+By default, if one subcluster is unavailable, writes may fail if they target that subcluster.
+To allow writing in another subcluster, one can make the mount point fault tolerant:
+
+ [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -add /data ns1,ns2 /data -order HASH_ALL -faulttolerant
+
+Note that this can lead to a file to be written in multiple subclusters or a folder missing in one.
+One needs to be aware of the possibility of these inconsistencies and target this `faulttolerant` approach to resilient paths.
+An example for this is the `/app-logs` folder which will mostly write once into a subfolder.
### Disabling nameservices
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
index 9b58fff..d8dffee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
@@ -18,19 +18,44 @@
package org.apache.hadoop.hdfs.server.federation;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.net.ConnectException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
@@ -40,15 +65,29 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.protobuf.BlockingService;
@@ -59,9 +98,15 @@ import com.google.protobuf.BlockingService;
*/
public class MockNamenode {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MockNamenode.class);
+
+
/** Mock implementation of the Namenode. */
private final NamenodeProtocols mockNn;
+ /** Name service identifier (subcluster). */
+ private String nsId;
/** HA state of the Namenode. */
private HAServiceState haState = HAServiceState.STANDBY;
@@ -71,9 +116,13 @@ public class MockNamenode {
private HttpServer2 httpServer;
- public MockNamenode() throws Exception {
- Configuration conf = new Configuration();
+ public MockNamenode(final String nsIdentifier) throws IOException {
+ this(nsIdentifier, new HdfsConfiguration());
+ }
+ public MockNamenode(final String nsIdentifier, final Configuration conf)
+ throws IOException {
+ this.nsId = nsIdentifier;
this.mockNn = mock(NamenodeProtocols.class);
setupMock();
setupRPCServer(conf);
@@ -86,7 +135,7 @@ public class MockNamenode {
* @throws IOException If the mock cannot be setup.
*/
protected void setupMock() throws IOException {
- NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1);
+ NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1);
when(mockNn.versionRequest()).thenReturn(nsInfo);
when(mockNn.getServiceStatus()).thenAnswer(new Answer<HAServiceStatus>() {
@@ -115,11 +164,16 @@ public class MockNamenode {
ClientNamenodeProtocol.newReflectiveBlockingService(
clientNNProtoXlator);
+ int numHandlers = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
+
rpcServer = new RPC.Builder(conf)
.setProtocol(ClientNamenodeProtocolPB.class)
.setInstance(clientNNPbService)
.setBindAddress("0.0.0.0")
.setPort(0)
+ .setNumHandlers(numHandlers)
.build();
NamenodeProtocolServerSideTranslatorPB nnProtoXlator =
@@ -146,6 +200,18 @@ public class MockNamenode {
DFSUtil.addPBProtocol(
conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
+ this.rpcServer.addTerseExceptions(
+ RemoteException.class,
+ SafeModeException.class,
+ FileNotFoundException.class,
+ FileAlreadyExistsException.class,
+ AccessControlException.class,
+ LeaseExpiredException.class,
+ NotReplicatedYetException.class,
+ IOException.class,
+ ConnectException.class,
+ StandbyException.class);
+
rpcServer.start();
}
@@ -189,6 +255,14 @@ public class MockNamenode {
}
/**
+ * Get the name service id (subcluster) of the Mock Namenode.
+ * @return Name service identifier.
+ */
+ public String getNameserviceId() {
+ return nsId;
+ }
+
+ /**
* Get the HA state of the Mock Namenode.
* @return HA state (ACTIVE or STANDBY).
*/
@@ -217,9 +291,158 @@ public class MockNamenode {
public void stop() throws Exception {
if (rpcServer != null) {
rpcServer.stop();
+ rpcServer = null;
}
if (httpServer != null) {
httpServer.stop();
+ httpServer = null;
}
}
+
+ /**
+ * Add the mock for the FileSystem calls in ClientProtocol.
+ * @throws IOException If it cannot be setup.
+ */
+ public void addFileSystemMock() throws IOException {
+ final SortedMap<String, String> fs =
+ new ConcurrentSkipListMap<String, String>();
+
+ DirectoryListing l = mockNn.getListing(anyString(), any(), anyBoolean());
+ when(l).thenAnswer(invocation -> {
+ String src = getSrc(invocation);
+ LOG.info("{} getListing({})", nsId, src);
+ if (!src.endsWith("/")) {
+ src += "/";
+ }
+ Map<String, String> files =
+ fs.subMap(src, src + Character.MAX_VALUE);
+ List<HdfsFileStatus> list = new ArrayList<>();
+ for (String file : files.keySet()) {
+ if (file.substring(src.length()).indexOf('/') < 0) {
+ HdfsFileStatus fileStatus =
+ getMockHdfsFileStatus(file, fs.get(file));
+ list.add(fileStatus);
+ }
+ }
+ HdfsFileStatus[] array = list.toArray(
+ new HdfsFileStatus[list.size()]);
+ return new DirectoryListing(array, 0);
+ });
+ when(mockNn.getFileInfo(anyString())).thenAnswer(invocation -> {
+ String src = getSrc(invocation);
+ LOG.info("{} getFileInfo({})", nsId, src);
+ return getMockHdfsFileStatus(src, fs.get(src));
+ });
+ HdfsFileStatus c = mockNn.create(anyString(), any(), anyString(), any(),
+ anyBoolean(), anyShort(), anyLong(), any(), any(), any());
+ when(c).thenAnswer(invocation -> {
+ String src = getSrc(invocation);
+ LOG.info("{} create({})", nsId, src);
+ fs.put(src, "FILE");
+ return getMockHdfsFileStatus(src, "FILE");
+ });
+ LocatedBlocks b = mockNn.getBlockLocations(
+ anyString(), anyLong(), anyLong());
+ when(b).thenAnswer(invocation -> {
+ String src = getSrc(invocation);
+ LOG.info("{} getBlockLocations({})", nsId, src);
+ if (!fs.containsKey(src)) {
+ LOG.error("{} cannot find {} for getBlockLocations", nsId, src);
+ throw new FileNotFoundException("File does not exist " + src);
+ }
+ return mock(LocatedBlocks.class);
+ });
+ boolean f = mockNn.complete(anyString(), anyString(), any(), anyLong());
+ when(f).thenAnswer(invocation -> {
+ String src = getSrc(invocation);
+ if (!fs.containsKey(src)) {
+ LOG.error("{} cannot find {} for complete", nsId, src);
+ throw new FileNotFoundException("File does not exist " + src);
+ }
+ return true;
+ });
+ LocatedBlock a = mockNn.addBlock(
+ anyString(), anyString(), any(), any(), anyLong(), any(), any());
+ when(a).thenAnswer(invocation -> {
+ String src = getSrc(invocation);
+ if (!fs.containsKey(src)) {
+ LOG.error("{} cannot find {} for addBlock", nsId, src);
+ throw new FileNotFoundException("File does not exist " + src);
+ }
+ return getMockLocatedBlock(nsId);
+ });
+ boolean m = mockNn.mkdirs(anyString(), any(), anyBoolean());
+ when(m).thenAnswer(invocation -> {
+ String src = getSrc(invocation);
+ LOG.info("{} mkdirs({})", nsId, src);
+ fs.put(src, "DIRECTORY");
+ return true;
+ });
+ when(mockNn.getServerDefaults()).thenAnswer(invocation -> {
+ LOG.info("{} getServerDefaults", nsId);
+ FsServerDefaults defaults = mock(FsServerDefaults.class);
+ when(defaults.getChecksumType()).thenReturn(
+ Type.valueOf(DataChecksum.CHECKSUM_CRC32));
+ when(defaults.getKeyProviderUri()).thenReturn(nsId);
+ return defaults;
+ });
+ }
+
+ private static String getSrc(InvocationOnMock invocation) {
+ return (String) invocation.getArguments()[0];
+ }
+
+ /**
+ * Get a mock HDFS file status.
+ * @param filename Name of the file.
+ * @param type Type of the file (FILE, DIRECTORY, or null).
+ * @return HDFS file status
+ */
+ private static HdfsFileStatus getMockHdfsFileStatus(
+ final String filename, final String type) {
+ if (type == null) {
+ return null;
+ }
+ HdfsFileStatus fileStatus = mock(HdfsFileStatus.class);
+ when(fileStatus.getLocalNameInBytes()).thenReturn(filename.getBytes());
+ when(fileStatus.getPermission()).thenReturn(mock(FsPermission.class));
+ when(fileStatus.getOwner()).thenReturn("owner");
+ when(fileStatus.getGroup()).thenReturn("group");
+ if (type.equals("FILE")) {
+ when(fileStatus.getLen()).thenReturn(100L);
+ when(fileStatus.getReplication()).thenReturn((short) 1);
+ when(fileStatus.getBlockSize()).thenReturn(
+ HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+ } else if (type.equals("DIRECTORY")) {
+ when(fileStatus.isDir()).thenReturn(true);
+ when(fileStatus.isDirectory()).thenReturn(true);
+ }
+ return fileStatus;
+ }
+
+ /**
+ * Get a mock located block pointing to one of the subclusters. It is
+ * allocated in a fake Datanode.
+ * @param nsId Name service identifier (subcluster).
+ * @return Mock located block.
+ */
+ private static LocatedBlock getMockLocatedBlock(final String nsId) {
+ LocatedBlock lb = mock(LocatedBlock.class);
+ when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
+ DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0",
+ 1111, 1112, 1113, 1114);
+ DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
+ when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
+ ExtendedBlock eb = mock(ExtendedBlock.class);
+ when(eb.getBlockPoolId()).thenReturn(nsId);
+ when(lb.getBlock()).thenReturn(eb);
+ @SuppressWarnings("unchecked")
+ Token<BlockTokenIdentifier> tok = mock(Token.class);
+ when(tok.getIdentifier()).thenReturn(nsId.getBytes());
+ when(tok.getPassword()).thenReturn(nsId.getBytes());
+ when(tok.getKind()).thenReturn(new Text(nsId));
+ when(tok.getService()).thenReturn(new Text(nsId));
+ when(lb.getBlockToken()).thenReturn(tok);
+ return lb;
+ }
}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
index 486d4a0..381203b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -152,7 +152,7 @@ public class TestRouterAdminCLI {
@Test
public void testAddMountTable() throws Exception {
- String nsId = "ns0";
+ String nsId = "ns0,ns1";
String src = "/test-addmounttable";
String dest = "/addmounttable";
String[] argv = new String[] {"-add", src, nsId, dest};
@@ -166,26 +166,35 @@ public class TestRouterAdminCLI {
MountTable mountTable = getResponse.getEntries().get(0);
List<RemoteLocation> destinations = mountTable.getDestinations();
- assertEquals(1, destinations.size());
+ assertEquals(2, destinations.size());
assertEquals(src, mountTable.getSourcePath());
- assertEquals(nsId, destinations.get(0).getNameserviceId());
+ assertEquals("ns0", destinations.get(0).getNameserviceId());
assertEquals(dest, destinations.get(0).getDest());
+ assertEquals("ns1", destinations.get(1).getNameserviceId());
+ assertEquals(dest, destinations.get(1).getDest());
assertFalse(mountTable.isReadOnly());
+ assertFalse(mountTable.isFaultTolerant());
// test mount table update behavior
dest = dest + "-new";
- argv = new String[] {"-add", src, nsId, dest, "-readonly"};
+ argv = new String[] {"-add", src, nsId, dest, "-readonly",
+ "-faulttolerant", "-order", "HASH_ALL"};
assertEquals(0, ToolRunner.run(admin, argv));
stateStore.loadCache(MountTableStoreImpl.class, true);
getResponse = client.getMountTableManager()
.getMountTableEntries(getRequest);
mountTable = getResponse.getEntries().get(0);
- assertEquals(2, mountTable.getDestinations().size());
- assertEquals(nsId, mountTable.getDestinations().get(1).getNameserviceId());
- assertEquals(dest, mountTable.getDestinations().get(1).getDest());
+ assertEquals(4, mountTable.getDestinations().size());
+ RemoteLocation loc2 = mountTable.getDestinations().get(2);
+ assertEquals("ns0", loc2.getNameserviceId());
+ assertEquals(dest, loc2.getDest());
+ RemoteLocation loc3 = mountTable.getDestinations().get(3);
+ assertEquals("ns1", loc3.getNameserviceId());
+ assertEquals(dest, loc3.getDest());
assertTrue(mountTable.isReadOnly());
+ assertTrue(mountTable.isFaultTolerant());
}
@Test
@@ -211,6 +220,7 @@ public class TestRouterAdminCLI {
assertEquals(nsId, destinations.get(0).getNameserviceId());
assertEquals(dest, destinations.get(0).getDest());
assertFalse(mountTable.isReadOnly());
+ assertFalse(mountTable.isFaultTolerant());
// test mount table update behavior
dest = dest + "-new";
@@ -516,17 +526,19 @@ public class TestRouterAdminCLI {
System.setOut(new PrintStream(out));
String[] argv = new String[] {"-add", src, nsId};
assertEquals(-1, ToolRunner.run(admin, argv));
- assertTrue(out.toString().contains(
+ assertTrue("Wrong message: " + out, out.toString().contains(
"\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
- + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ + "[-readonly] [-faulttolerant] "
+ + "[-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]"));
out.reset();
argv = new String[] {"-update", src, nsId};
assertEquals(-1, ToolRunner.run(admin, argv));
- assertTrue(out.toString().contains(
+ assertTrue("Wrong message: " + out, out.toString().contains(
"\t[-update <source> <nameservice1, nameservice2, ...> <destination> "
- + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ + "[-readonly] [-faulttolerant] "
+ + "[-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]"));
out.reset();
@@ -567,10 +579,11 @@ public class TestRouterAdminCLI {
assertEquals(-1, ToolRunner.run(admin, argv));
String expected = "Usage: hdfs dfsrouteradmin :\n"
+ "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
- + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]\n"
+ "\t[-update <source> <nameservice1, nameservice2, ...> "
- + "<destination> " + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ + "<destination> "
+ + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+ "-owner <owner> -group <group> -mode <mode>]\n" + "\t[-rm <source>]\n"
+ "\t[-ls <path>]\n"
+ "\t[-getDestination <path>]\n"
@@ -579,7 +592,7 @@ public class TestRouterAdminCLI {
+ "\t[-safemode enter | leave | get]\n"
+ "\t[-nameservice enable | disable <nameservice>]\n"
+ "\t[-getDisabledNameservices]";
- assertTrue(out.toString(), out.toString().contains(expected));
+ assertTrue("Wrong message: " + out, out.toString().contains(expected));
out.reset();
}
@@ -1159,4 +1172,28 @@ public class TestRouterAdminCLI {
argv = new String[] {"-getDestination /file1.txt /file2.txt"};
assertEquals(-1, ToolRunner.run(admin, argv));
}
+
+ @Test
+ public void testErrorFaultTolerant() throws Exception {
+
+ System.setErr(new PrintStream(err));
+ String[] argv = new String[] {"-add", "/mntft", "ns01", "/tmp",
+ "-faulttolerant"};
+ assertEquals(-1, ToolRunner.run(admin, argv));
+ assertTrue(err.toString(), err.toString().contains(
+ "Invalid entry, fault tolerance requires multiple destinations"));
+ err.reset();
+
+ System.setErr(new PrintStream(err));
+ argv = new String[] {"-add", "/mntft", "ns0,ns1", "/tmp",
+ "-order", "HASH", "-faulttolerant"};
+ assertEquals(-1, ToolRunner.run(admin, argv));
+ assertTrue(err.toString(), err.toString().contains(
+ "Invalid entry, fault tolerance only supported for ALL order"));
+ err.reset();
+
+ argv = new String[] {"-add", "/mntft", "ns0,ns1", "/tmp",
+ "-order", "HASH_ALL", "-faulttolerant"};
+ assertEquals(0, ToolRunner.run(admin, argv));
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
new file mode 100644
index 0000000..c8f96c6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
@@ -0,0 +1,654 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.MockNamenode;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test the handling of fault tolerant mount points in the Router.
+ */
+public class TestRouterFaultTolerant {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRouterFaultTolerant.class);
+
+ /** Number of files to create for testing. */
+ private static final int NUM_FILES = 10;
+ /** Number of Routers for test. */
+ private static final int NUM_ROUTERS = 2;
+
+
+ /** Namenodes for the test per name service id (subcluster). */
+ private Map<String, MockNamenode> namenodes = new HashMap<>();
+ /** Routers for the test. */
+ private List<Router> routers = new ArrayList<>();
+
+ /** Run test tasks in parallel. */
+ private ExecutorService service;
+
+
+ @Before
+ public void setup() throws Exception {
+ LOG.info("Start the Namenodes");
+ Configuration nnConf = new HdfsConfiguration();
+ nnConf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 10);
+ for (final String nsId : asList("ns0", "ns1")) {
+ MockNamenode nn = new MockNamenode(nsId, nnConf);
+ nn.transitionToActive();
+ nn.addFileSystemMock();
+ namenodes.put(nsId, nn);
+ }
+
+ LOG.info("Start the Routers");
+ Configuration routerConf = new RouterConfigBuilder()
+ .stateStore()
+ .admin()
+ .rpc()
+ .build();
+ routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
+ routerConf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+ routerConf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "0.0.0.0:0");
+ // Speedup time outs
+ routerConf.setTimeDuration(
+ RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT,
+ 500, TimeUnit.MILLISECONDS);
+
+ Configuration stateStoreConf = getStateStoreConfiguration();
+ stateStoreConf.setClass(
+ RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+ MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
+ stateStoreConf.setClass(
+ RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+ MultipleDestinationMountTableResolver.class,
+ FileSubclusterResolver.class);
+ routerConf.addResource(stateStoreConf);
+
+ for (int i = 0; i < NUM_ROUTERS; i++) {
+ // router0 doesn't allow partial listing
+ routerConf.setBoolean(
+ RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, i != 0);
+
+ final Router router = new Router();
+ router.init(routerConf);
+ router.start();
+ routers.add(router);
+ }
+
+ LOG.info("Registering the subclusters in the Routers");
+ registerSubclusters(Collections.singleton("ns1"));
+
+ LOG.info("Stop ns1 to simulate an unavailable subcluster");
+ namenodes.get("ns1").stop();
+
+ service = Executors.newFixedThreadPool(10);
+ }
+
+ /**
+ * Register the subclusters in all Routers.
+ * @param unavailableSubclusters Set of unavailable subclusters.
+ * @throws IOException If it cannot register a subcluster.
+ */
+ private void registerSubclusters(Set<String> unavailableSubclusters)
+ throws IOException {
+ for (final Router router : routers) {
+ MembershipNamenodeResolver resolver =
+ (MembershipNamenodeResolver) router.getNamenodeResolver();
+ for (final MockNamenode nn : namenodes.values()) {
+ String nsId = nn.getNameserviceId();
+ String rpcAddress = "localhost:" + nn.getRPCPort();
+ String httpAddress = "localhost:" + nn.getHTTPPort();
+ NamenodeStatusReport report = new NamenodeStatusReport(
+ nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress);
+ if (unavailableSubclusters.contains(nsId)) {
+ LOG.info("Register {} as UNAVAILABLE", nsId);
+ report.setRegistrationValid(false);
+ } else {
+ LOG.info("Register {} as ACTIVE", nsId);
+ report.setRegistrationValid(true);
+ }
+ report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0));
+ resolver.registerNamenode(report);
+ }
+ resolver.loadCache(true);
+ }
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ LOG.info("Stopping the cluster");
+ for (final MockNamenode nn : namenodes.values()) {
+ nn.stop();
+ }
+ namenodes.clear();
+
+ routers.forEach(router -> router.stop());
+ routers.clear();
+
+ if (service != null) {
+ service.shutdown();
+ service = null;
+ }
+ }
+
+ /**
+ * Add a mount table entry in some name services and wait until it is
+ * available.
+ * @param mountPoint Name of the mount point.
+ * @param order Order of the mount table entry.
+ * @param nsIds Name service identifiers.
+ * @throws Exception If the entry could not be created.
+ */
+ private void createMountTableEntry(
+ final String mountPoint, final DestinationOrder order,
+ Collection<String> nsIds) throws Exception {
+ Router router = getRandomRouter();
+ RouterClient admin = getAdminClient(router);
+ MountTableManager mountTable = admin.getMountTableManager();
+ Map<String, String> destMap = new HashMap<>();
+ for (String nsId : nsIds) {
+ destMap.put(nsId, mountPoint);
+ }
+ MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
+ newEntry.setDestOrder(order);
+ AddMountTableEntryRequest addRequest =
+ AddMountTableEntryRequest.newInstance(newEntry);
+ AddMountTableEntryResponse addResponse =
+ mountTable.addMountTableEntry(addRequest);
+ boolean created = addResponse.getStatus();
+ assertTrue(created);
+
+ refreshRoutersCaches();
+
+ // Check for the path
+ GetMountTableEntriesRequest getRequest =
+ GetMountTableEntriesRequest.newInstance(mountPoint);
+ GetMountTableEntriesResponse getResponse =
+ mountTable.getMountTableEntries(getRequest);
+ List<MountTable> entries = getResponse.getEntries();
+ assertEquals("Too many entries: " + entries, 1, entries.size());
+ assertEquals(mountPoint, entries.get(0).getSourcePath());
+ }
+
+ /**
+ * Update a mount table entry to be fault tolerant.
+ * @param mountPoint Mount point to update.
+ * @throws IOException If it cannot update the mount point.
+ */
+ private void updateMountPointFaultTolerant(final String mountPoint)
+ throws IOException {
+ Router router = getRandomRouter();
+ RouterClient admin = getAdminClient(router);
+ MountTableManager mountTable = admin.getMountTableManager();
+ GetMountTableEntriesRequest getRequest =
+ GetMountTableEntriesRequest.newInstance(mountPoint);
+ GetMountTableEntriesResponse entries =
+ mountTable.getMountTableEntries(getRequest);
+ MountTable updateEntry = entries.getEntries().get(0);
+ updateEntry.setFaultTolerant(true);
+ UpdateMountTableEntryRequest updateRequest =
+ UpdateMountTableEntryRequest.newInstance(updateEntry);
+ UpdateMountTableEntryResponse updateResponse =
+ mountTable.updateMountTableEntry(updateRequest);
+ assertTrue(updateResponse.getStatus());
+
+ refreshRoutersCaches();
+ }
+
+ /**
+ * Refresh the caches of all Routers (to get the mount table).
+ */
+ private void refreshRoutersCaches() {
+ for (final Router router : routers) {
+ StateStoreService stateStore = router.getStateStore();
+ stateStore.refreshCaches(true);
+ }
+ }
+
+ /**
+ * Test the behavior of the Router when one of the subclusters in a mount
+ * point fails. In particular, it checks if it can write files or not.
+ * Related to {@link TestRouterRpcMultiDestination#testSubclusterDown()}.
+ */
+ @Test
+ public void testWriteWithFailedSubcluster() throws Exception {
+
+ // Run the actual tests with each approach
+ final List<Callable<Boolean>> tasks = new ArrayList<>();
+ final List<DestinationOrder> orders = asList(
+ DestinationOrder.HASH_ALL,
+ DestinationOrder.SPACE,
+ DestinationOrder.RANDOM,
+ DestinationOrder.HASH);
+ for (DestinationOrder order : orders) {
+ tasks.add(() -> {
+ testWriteWithFailedSubcluster(order);
+ return true;
+ });
+ }
+ TaskResults results = collectResults("Full tests", tasks);
+ assertEquals(orders.size(), results.getSuccess());
+ }
+
+ /**
+ * Test the behavior of the Router when one of the subclusters in a mount
+ * point fails. It assumes that ns1 is already down.
+ * @param order Destination order of the mount point.
+ * @throws Exception If we cannot run the test.
+ */
+ private void testWriteWithFailedSubcluster(final DestinationOrder order)
+ throws Exception {
+
+ final FileSystem router0Fs = getFileSystem(routers.get(0));
+ final FileSystem router1Fs = getFileSystem(routers.get(1));
+ final FileSystem ns0Fs = getFileSystem(namenodes.get("ns0").getRPCPort());
+
+ final String mountPoint = "/" + order + "-failsubcluster";
+ final Path mountPath = new Path(mountPoint);
+ LOG.info("Setup {} with order {}", mountPoint, order);
+ createMountTableEntry(mountPoint, order, namenodes.keySet());
+
+
+ LOG.info("Write in {} should succeed writing in ns0 and fail for ns1",
+ mountPath);
+ checkDirectoriesFaultTolerant(
+ mountPath, order, router0Fs, router1Fs, ns0Fs, false);
+ checkFilesFaultTolerant(
+ mountPath, order, router0Fs, router1Fs, ns0Fs, false);
+
+ LOG.info("Make {} fault tolerant and everything succeeds", mountPath);
+ IOException ioe = null;
+ try {
+ updateMountPointFaultTolerant(mountPoint);
+ } catch (IOException e) {
+ ioe = e;
+ }
+ if (DestinationOrder.FOLDER_ALL.contains(order)) {
+ assertNull(ioe);
+ checkDirectoriesFaultTolerant(
+ mountPath, order, router0Fs, router1Fs, ns0Fs, true);
+ checkFilesFaultTolerant(
+ mountPath, order, router0Fs, router1Fs, ns0Fs, true);
+ } else {
+ assertTrue(ioe.getMessage().startsWith(
+ "Invalid entry, fault tolerance only supported for ALL order"));
+ }
+ }
+
+ /**
+ * Check directory creation on a mount point.
+ * If it is fault tolerant, it should be able to write everything.
+ * If it is not fault tolerant, it should fail to write some.
+ */
+ private void checkDirectoriesFaultTolerant(
+ Path mountPoint, DestinationOrder order,
+ FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs,
+ boolean faultTolerant) throws Exception {
+
+ final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint);
+
+ LOG.info("Create directories in {}", mountPoint);
+ final List<Callable<Boolean>> tasks = new ArrayList<>();
+ for (int i = 0; i < NUM_FILES; i++) {
+ final Path dir = new Path(mountPoint,
+ String.format("dir-%s-%03d", faultTolerant, i));
+ FileSystem fs = getRandomRouterFileSystem();
+ tasks.add(getDirCreateTask(fs, dir));
+ }
+ TaskResults results = collectResults("Create dir " + mountPoint, tasks);
+
+ LOG.info("Check directories results for {}: {}", mountPoint, results);
+ if (faultTolerant || DestinationOrder.FOLDER_ALL.contains(order)) {
+ assertEquals(NUM_FILES, results.getSuccess());
+ assertEquals(0, results.getFailure());
+ } else {
+ assertBothResults("check dir " + mountPoint, NUM_FILES, results);
+ }
+
+ LOG.info("Check directories listing for {}", mountPoint);
+ tasks.add(getListFailTask(router0Fs, mountPoint));
+ int filesExpected = dirs0.length + results.getSuccess();
+ tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected));
+ assertEquals(2, collectResults("List " + mountPoint, tasks).getSuccess());
+ }
+
+ /**
+ * Check file creation on a mount point.
+ * If it is fault tolerant, it should be able to write everything.
+ * If it is not fault tolerant, it should fail to write some of the files.
+ */
+ private void checkFilesFaultTolerant(
+ Path mountPoint, DestinationOrder order,
+ FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs,
+ boolean faultTolerant) throws Exception {
+
+ // Get one of the existing sub directories
+ final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint);
+ final Path dir0 = Path.getPathWithoutSchemeAndAuthority(
+ dirs0[0].getPath());
+
+ LOG.info("Create files in {}", dir0);
+ final List<Callable<Boolean>> tasks = new ArrayList<>();
+ for (int i = 0; i < NUM_FILES; i++) {
+ final String newFile = String.format("%s/file-%03d.txt", dir0, i);
+ FileSystem fs = getRandomRouterFileSystem();
+ tasks.add(getFileCreateTask(fs, newFile, ns0Fs));
+ }
+ TaskResults results = collectResults("Create file " + dir0, tasks);
+
+ LOG.info("Check files results for {}: {}", dir0, results);
+ if (faultTolerant || !DestinationOrder.FOLDER_ALL.contains(order)) {
+ assertEquals(NUM_FILES, results.getSuccess());
+ assertEquals(0, results.getFailure());
+ } else {
+ assertBothResults("check files " + dir0, NUM_FILES, results);
+ }
+
+ LOG.info("Check files listing for {}", dir0);
+ tasks.add(getListFailTask(router0Fs, dir0));
+ tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess()));
+ assertEquals(2, collectResults("List " + dir0, tasks).getSuccess());
+ }
+
+ /**
+ * Get the string representation for the files.
+ * @param files Files to check.
+ * @return String representation.
+ */
+ private static String toString(final FileStatus[] files) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (final FileStatus file : files) {
+ if (sb.length() > 1) {
+ sb.append(", ");
+ }
+ sb.append(Path.getPathWithoutSchemeAndAuthority(file.getPath()));
+ }
+ sb.append("]");
+ return sb.toString();
+ }
+
+ /**
+ * List the files in a path.
+ * @param fs File system to check.
+ * @param path Path to list.
+ * @return List of files.
+ * @throws IOException If we cannot list.
+ */
+ private FileStatus[] listStatus(final FileSystem fs, final Path path)
+ throws IOException {
+ FileStatus[] files = new FileStatus[] {};
+ try {
+ files = fs.listStatus(path);
+ } catch (FileNotFoundException fnfe) {
+ LOG.debug("File not found: {}", fnfe.getMessage());
+ }
+ return files;
+ }
+
+ /**
+ * Task that creates a file and checks if it is available.
+ * @param file File to create.
+ * @param checkFs File system for checking if the file is properly created.
+ * @return Result of creating the file.
+ */
+ private static Callable<Boolean> getFileCreateTask(
+ final FileSystem fs, final String file, FileSystem checkFs) {
+ return () -> {
+ try {
+ Path path = new Path(file);
+ FSDataOutputStream os = fs.create(path);
+ // We don't write because we have no mock Datanodes
+ os.close();
+ FileStatus fileStatus = checkFs.getFileStatus(path);
+ assertTrue("File not created properly: " + fileStatus,
+ fileStatus.getLen() > 0);
+ return true;
+ } catch (RemoteException re) {
+ return false;
+ }
+ };
+ }
+
+ /**
+ * Task that creates a directory.
+ * @param dir Directory to create.
+ * @return Result of creating the directory..
+ */
+ private static Callable<Boolean> getDirCreateTask(
+ final FileSystem fs, final Path dir) {
+ return () -> {
+ try {
+ fs.mkdirs(dir);
+ return true;
+ } catch (RemoteException re) {
+ return false;
+ }
+ };
+ }
+
+ /**
+ * Task that lists a directory and expects to fail.
+ * @param fs File system to check.
+ * @param path Path to try to list.
+ * @return If the listing failed as expected.
+ */
+ private static Callable<Boolean> getListFailTask(FileSystem fs, Path path) {
+ return () -> {
+ try {
+ fs.listStatus(path);
+ return false;
+ } catch (RemoteException re) {
+ return true;
+ }
+ };
+ }
+
+ /**
+ * Task that lists a directory and succeeds.
+ * @param fs File system to check.
+ * @param path Path to list.
+ * @param expected Number of files to expect to find.
+ * @return If the listing succeeds.
+ */
+ private static Callable<Boolean> getListSuccessTask(
+ FileSystem fs, Path path, int expected) {
+ return () -> {
+ final FileStatus[] dirs = fs.listStatus(path);
+ assertEquals(toString(dirs), expected, dirs.length);
+ return true;
+ };
+ }
+
+ /**
+ * Invoke a set of tasks and collect their outputs.
+ * The tasks should do assertions.
+ *
+ * @param service Execution Service to run the tasks.
+ * @param tasks Tasks to run.
+ * @throws Exception If it cannot collect the results.
+ */
+ private TaskResults collectResults(final String tag,
+ final Collection<Callable<Boolean>> tasks) throws Exception {
+ final TaskResults results = new TaskResults();
+ service.invokeAll(tasks).forEach(task -> {
+ try {
+ boolean succeeded = task.get();
+ if (succeeded) {
+ LOG.info("Got success for {}", tag);
+ results.incrSuccess();
+ } else {
+ LOG.info("Got failure for {}", tag);
+ results.incrFailure();
+ }
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ });
+ tasks.clear();
+ return results;
+ }
+
+ /**
+ * Class to summarize the results of running a task.
+ */
+ static class TaskResults {
+ private final AtomicInteger success = new AtomicInteger(0);
+ private final AtomicInteger failure = new AtomicInteger(0);
+ public void incrSuccess() {
+ success.incrementAndGet();
+ }
+ public void incrFailure() {
+ failure.incrementAndGet();
+ }
+ public int getSuccess() {
+ return success.get();
+ }
+ public int getFailure() {
+ return failure.get();
+ }
+ public int getTotal() {
+ return success.get() + failure.get();
+ }
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("Success=").append(getSuccess())
+ .append(" Failure=").append(getFailure())
+ .toString();
+ }
+ }
+
+ /**
+ * Asserts that the results are the expected amount and it has both success
+ * and failure.
+ * @param msg Message to show when the assertion fails.
+ * @param expected Expected number of results.
+ * @param actual Actual results.
+ */
+ private static void assertBothResults(String msg,
+ int expected, TaskResults actual) {
+ assertEquals(msg, expected, actual.getTotal());
+ assertTrue("Expected some success for " + msg, actual.getSuccess() > 0);
+ assertTrue("Expected some failure for " + msg, actual.getFailure() > 0);
+ }
+
+ /**
+ * Get a random Router from the cluster.
+ * @return Random Router.
+ */
+ private Router getRandomRouter() {
+ Random rnd = new Random();
+ int index = rnd.nextInt(routers.size());
+ return routers.get(index);
+ }
+
+ /**
+ * Get a file system from one of the Routers as a random user to allow better
+ * concurrency in the Router.
+ * @return File system from a random user.
+ * @throws Exception If we cannot create the file system.
+ */
+ private FileSystem getRandomRouterFileSystem() throws Exception {
+ final UserGroupInformation userUgi =
+ UserGroupInformation.createUserForTesting(
+ "user-" + UUID.randomUUID(), new String[]{"group"});
+ Router router = getRandomRouter();
+ return userUgi.doAs(
+ (PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
+ }
+
+ private static FileSystem getFileSystem(int rpcPort) throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ URI uri = URI.create("hdfs://localhost:" + rpcPort);
+ return DistributedFileSystem.get(uri, conf);
+ }
+
+ private static FileSystem getFileSystem(final Router router)
+ throws IOException {
+ InetSocketAddress rpcAddress = router.getRpcServerAddress();
+ int rpcPort = rpcAddress.getPort();
+ return getFileSystem(rpcPort);
+ }
+
+ private static RouterClient getAdminClient(
+ final Router router) throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ InetSocketAddress routerSocket = router.getAdminServerAddress();
+ return new RouterClient(routerSocket, conf);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
index 1224fa2..8fa3506 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
@@ -74,7 +74,7 @@ public class TestRouterNamenodeMonitoring {
for (String nsId : nsIds) {
nns.put(nsId, new HashMap<>());
for (String nnId : asList("nn0", "nn1")) {
- nns.get(nsId).put(nnId, new MockNamenode());
+ nns.get(nsId).put(nnId, new MockNamenode(nsId));
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
index 2ec5d62..98f9ebc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
/**
@@ -93,7 +94,7 @@ public final class FederationStateStoreTestUtils {
conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class);
- if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) {
+ if (StateStoreFileBaseImpl.class.isAssignableFrom(clazz)) {
setFileConfiguration(conf);
}
return conf;
@@ -178,8 +179,7 @@ public final class FederationStateStoreTestUtils {
* @param conf Configuration to extend.
*/
public static void setFileConfiguration(Configuration conf) {
- String workingPath = System.getProperty("user.dir");
- String stateStorePath = workingPath + "/statestore";
+ String stateStorePath = GenericTestUtils.getRandomizedTempPath();
conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java
index d30d6ba..6e5bd9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java
@@ -78,15 +78,17 @@ public class TestStateStoreMountTable extends TestStateStoreBase {
assertFalse(getStateStore().isDriverReady());
// Test APIs that access the store to check they throw the correct exception
+ MountTable entry = MountTable.newInstance(
+ "/mnt", Collections.singletonMap("ns0", "/tmp"));
AddMountTableEntryRequest addRequest =
- AddMountTableEntryRequest.newInstance();
+ AddMountTableEntryRequest.newInstance(entry);
verifyException(mountStore, "addMountTableEntry",
StateStoreUnavailableException.class,
new Class[] {AddMountTableEntryRequest.class},
new Object[] {addRequest});
UpdateMountTableEntryRequest updateRequest =
- UpdateMountTableEntryRequest.newInstance();
+ UpdateMountTableEntryRequest.newInstance(entry);
verifyException(mountStore, "updateMountTableEntry",
StateStoreUnavailableException.class,
new Class[] {UpdateMountTableEntryRequest.class},
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
index 05552738..339a977 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.federation.store.records;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -165,6 +166,24 @@ public class TestMountTable {
}
@Test
+ public void testFaultTolerant() throws IOException {
+
+ Map<String, String> dest = new LinkedHashMap<>();
+ dest.put(DST_NS_0, DST_PATH_0);
+ dest.put(DST_NS_1, DST_PATH_1);
+ MountTable record0 = MountTable.newInstance(SRC, dest);
+ assertFalse(record0.isFaultTolerant());
+
+ MountTable record1 = MountTable.newInstance(SRC, dest);
+ assertFalse(record1.isFaultTolerant());
+ assertEquals(record0, record1);
+
+ record1.setFaultTolerant(true);
+ assertTrue(record1.isFaultTolerant());
+ assertNotEquals(record0, record1);
+ }
+
+ @Test
public void testOrder() throws IOException {
testOrder(DestinationOrder.HASH);
testOrder(DestinationOrder.LOCAL);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 32e88a2..6336281 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -428,8 +428,8 @@ Runs the DFS router. See [Router](../hadoop-hdfs-rbf/HDFSRouterFederation.html#R
Usage:
hdfs dfsrouteradmin
- [-add <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
- [-update <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
+ [-add <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
+ [-update <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
[-rm <source>]
[-ls <path>]
[-getDestination <path>]
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org