You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2019/03/30 01:52:17 UTC

[hadoop] branch HDFS-13891 updated: HDFS-14316. RBF: Support unavailable subclusters for mount points with multiple destinations. Contributed by Inigo Goiri.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/HDFS-13891 by this push:
     new dea3798  HDFS-14316. RBF: Support unavailable subclusters for mount points with multiple destinations. Contributed by Inigo Goiri.
dea3798 is described below

commit dea3798e174063b641c7a8dc023d2b5ceba090b2
Author: Ayush Saxena <ay...@apache.org>
AuthorDate: Sat Mar 30 07:15:41 2019 +0530

    HDFS-14316. RBF: Support unavailable subclusters for mount points with multiple destinations. Contributed by Inigo Goiri.
---
 .../federation/metrics/FederationMetrics.java      |   1 +
 .../resolver/order/DestinationOrder.java           |  10 +-
 .../server/federation/router/RBFConfigKeys.java    |  12 +-
 .../server/federation/router/RemoteMethod.java     |  10 +-
 .../hdfs/server/federation/router/RemoteParam.java |   9 +
 .../federation/router/RouterClientProtocol.java    | 133 ++++-
 .../server/federation/router/RouterRpcClient.java  |  79 ++-
 .../server/federation/router/RouterRpcServer.java  |  48 +-
 .../federation/store/impl/MountTableStoreImpl.java |   2 +
 .../federation/store/records/MountTable.java       |  56 +-
 .../store/records/impl/pb/MountTablePBImpl.java    |  14 +
 .../hadoop/hdfs/tools/federation/RouterAdmin.java  |  31 +-
 .../src/main/proto/FederationProtocol.proto        |   2 +
 .../src/main/resources/hdfs-rbf-default.xml        |  30 +
 .../src/main/webapps/router/federationhealth.html  |   2 +
 .../src/main/webapps/router/federationhealth.js    |  12 +
 .../src/main/webapps/static/rbf.css                |   5 +
 .../src/site/markdown/HDFSRouterFederation.md      |   8 +
 .../hdfs/server/federation/MockNamenode.java       | 229 +++++++-
 .../federation/router/TestRouterAdminCLI.java      |  65 +-
 .../federation/router/TestRouterFaultTolerant.java | 654 +++++++++++++++++++++
 .../router/TestRouterNamenodeMonitoring.java       |   2 +-
 .../store/FederationStateStoreTestUtils.java       |   6 +-
 .../federation/store/TestStateStoreMountTable.java |   6 +-
 .../federation/store/records/TestMountTable.java   |  19 +
 .../hadoop-hdfs/src/site/markdown/HDFSCommands.md  |   4 +-
 26 files changed, 1371 insertions(+), 78 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
index 5ab978d..a39f17d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationMetrics.java
@@ -270,6 +270,7 @@ public class FederationMetrics implements FederationMBean {
           innerInfo.put("order", "");
         }
         innerInfo.put("readonly", entry.isReadOnly());
+        innerInfo.put("faulttolerant", entry.isFaultTolerant());
         info.add(Collections.unmodifiableMap(innerInfo));
       }
     } catch (IOException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
index 99c5e22..6a637d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/order/DestinationOrder.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.federation.resolver.order;
 
+import java.util.EnumSet;
+
 /**
  * Order of the destinations when we have multiple of them. When the resolver
  * of files to subclusters (FileSubclusterResolver) has multiple destinations,
@@ -27,5 +29,11 @@ public enum DestinationOrder {
   LOCAL, // Local first
   RANDOM, // Random order
   HASH_ALL, // Follow consistent hashing
-  SPACE // Available space based order
+  SPACE; // Available space based order
+
+  /** Approaches that write folders in all subclusters. */
+  public static final EnumSet<DestinationOrder> FOLDER_ALL = EnumSet.of(
+      HASH_ALL,
+      RANDOM,
+      SPACE);
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 657b6cf..153cd64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -135,7 +135,17 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_ALLOW_PARTIAL_LIST =
       FEDERATION_ROUTER_PREFIX + "client.allow-partial-listing";
   public static final boolean DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT = true;
-
+  public static final String DFS_ROUTER_CLIENT_MOUNT_TIME_OUT =
+      FEDERATION_ROUTER_PREFIX + "client.mount-status.time-out";
+  public static final long DFS_ROUTER_CLIENT_MOUNT_TIME_OUT_DEFAULT =
+      TimeUnit.SECONDS.toMillis(1);
+  public static final String DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT =
+      FEDERATION_ROUTER_PREFIX + "connect.max.retries.on.timeouts";
+  public static final int DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT = 0;
+  public static final String DFS_ROUTER_CLIENT_CONNECT_TIMEOUT =
+      FEDERATION_ROUTER_PREFIX + "connect.timeout";
+  public static final long DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT =
+      TimeUnit.SECONDS.toMillis(2);
 
   // HDFS Router State Store connection
   public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
index 6ff2b01..f7ba812 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteMethod.java
@@ -210,7 +210,13 @@ public class RemoteMethod {
 
   @Override
   public String toString() {
-    return this.protocol.getSimpleName() + "#" + this.methodName + " " +
-        Arrays.toString(this.params);
+    return new StringBuilder()
+        .append(this.protocol.getSimpleName())
+        .append("#")
+        .append(this.methodName)
+        .append("(")
+        .append(Arrays.deepToString(this.params))
+        .append(")")
+        .toString();
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
index 8816ff6..8b216d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteParam.java
@@ -68,4 +68,13 @@ public class RemoteParam {
       return context.getDest();
     }
   }
+
+  @Override
+  public String toString() {
+    return new StringBuilder()
+        .append("RemoteParam(")
+        .append(this.paramMap)
+        .append(")")
+        .toString();
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index d1b2269..50d9267 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -84,6 +84,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.slf4j.Logger;
@@ -93,6 +94,8 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.ConnectException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -103,6 +106,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Module that implements all the RPC calls in {@link ClientProtocol} in the
@@ -119,6 +123,8 @@ public class RouterClientProtocol implements ClientProtocol {
 
   /** If it requires response from all subclusters. */
   private final boolean allowPartialList;
+  /** Time out when getting the mount statistics. */
+  private long mountStatusTimeOut;
 
   /** Identifier for the super user. */
   private final String superUser;
@@ -140,6 +146,10 @@ public class RouterClientProtocol implements ClientProtocol {
     this.allowPartialList = conf.getBoolean(
         RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST,
         RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT);
+    this.mountStatusTimeOut = conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_MOUNT_TIME_OUT,
+        RBFConfigKeys.DFS_ROUTER_CLIENT_MOUNT_TIME_OUT_DEFAULT,
+        TimeUnit.SECONDS);
 
     // User and group for reporting
     this.superUser = System.getProperty("user.name");
@@ -228,15 +238,92 @@ public class RouterClientProtocol implements ClientProtocol {
       }
     }
 
-    RemoteLocation createLocation = rpcServer.getCreateLocation(src);
     RemoteMethod method = new RemoteMethod("create",
         new Class<?>[] {String.class, FsPermission.class, String.class,
             EnumSetWritable.class, boolean.class, short.class,
             long.class, CryptoProtocolVersion[].class,
             String.class, String.class},
-        createLocation.getDest(), masked, clientName, flag, createParent,
+        new RemoteParam(), masked, clientName, flag, createParent,
         replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
-    return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
+    final List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(src, true);
+    RemoteLocation createLocation = null;
+    try {
+      createLocation = rpcServer.getCreateLocation(src);
+      return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
+    } catch (IOException ioe) {
+      final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+          method, src, ioe, createLocation, locations);
+      return rpcClient.invokeSequential(
+          newLocations, method, HdfsFileStatus.class, null);
+    }
+  }
+
+  /**
+   * Check if an exception is caused by an unavailable subcluster or not. It
+   * also checks the causes.
+   * @param ioe IOException to check.
+   * @return If caused by an unavailable subcluster. False if the should not be
+   *         retried (e.g., NSQuotaExceededException).
+   */
+  private static boolean isUnavailableSubclusterException(
+      final IOException ioe) {
+    if (ioe instanceof ConnectException ||
+        ioe instanceof ConnectTimeoutException ||
+        ioe instanceof NoNamenodesAvailableException) {
+      return true;
+    }
+    if (ioe.getCause() instanceof IOException) {
+      IOException cause = (IOException)ioe.getCause();
+      return isUnavailableSubclusterException(cause);
+    }
+    return false;
+  }
+
+  /**
+   * Check if a remote method can be retried in other subclusters when it
+   * failed in the original destination. This method returns the list of
+   * locations to retry in. This is used by fault tolerant mount points.
+   * @param method Method that failed and might be retried.
+   * @param src Path where the method was invoked.
+   * @param e Exception that was triggered.
+   * @param excludeLoc Location that failed and should be excluded.
+   * @param locations All the locations to retry.
+   * @return The locations where we should retry (excluding the failed ones).
+   * @throws IOException If this path is not fault tolerant or the exception
+   *                     should not be retried (e.g., NSQuotaExceededException).
+   */
+  private List<RemoteLocation> checkFaultTolerantRetry(
+      final RemoteMethod method, final String src, final IOException ioe,
+      final RemoteLocation excludeLoc, final List<RemoteLocation> locations)
+          throws IOException {
+
+    if (!isUnavailableSubclusterException(ioe)) {
+      LOG.debug("{} exception cannot be retried",
+          ioe.getClass().getSimpleName());
+      throw ioe;
+    }
+    if (!rpcServer.isPathFaultTolerant(src)) {
+      LOG.debug("{} does not allow retrying a failed subcluster", src);
+      throw ioe;
+    }
+
+    final List<RemoteLocation> newLocations;
+    if (excludeLoc == null) {
+      LOG.error("Cannot invoke {} for {}: {}", method, src, ioe.getMessage());
+      newLocations = locations;
+    } else {
+      LOG.error("Cannot invoke {} for {} in {}: {}",
+          method, src, excludeLoc, ioe.getMessage());
+      newLocations = new ArrayList<>();
+      for (final RemoteLocation loc : locations) {
+        if (!loc.equals(excludeLoc)) {
+          newLocations.add(loc);
+        }
+      }
+    }
+    LOG.info("{} allows retrying failed subclusters in {}", src, newLocations);
+    return newLocations;
   }
 
   @Override
@@ -598,13 +685,20 @@ public class RouterClientProtocol implements ClientProtocol {
         }
       } catch (IOException ioe) {
         // Can't query if this file exists or not.
-        LOG.error("Error requesting file info for path {} while proxing mkdirs",
-            src, ioe);
+        LOG.error("Error getting file info for {} while proxying mkdirs: {}",
+            src, ioe.getMessage());
       }
     }
 
-    RemoteLocation firstLocation = locations.get(0);
-    return (boolean) rpcClient.invokeSingle(firstLocation, method);
+    final RemoteLocation firstLocation = locations.get(0);
+    try {
+      return (boolean) rpcClient.invokeSingle(firstLocation, method);
+    } catch (IOException ioe) {
+      final List<RemoteLocation> newLocations = checkFaultTolerantRetry(
+          method, src, ioe, firstLocation, locations);
+      return rpcClient.invokeSequential(
+          newLocations, method, Boolean.class, Boolean.TRUE);
+    }
   }
 
   @Override
@@ -1696,10 +1790,26 @@ public class RouterClientProtocol implements ClientProtocol {
    */
   private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
       final RemoteMethod method) throws IOException {
+    return getFileInfoAll(locations, method, -1);
+  }
+
+  /**
+   * Get the file info from all the locations.
+   *
+   * @param locations Locations to check.
+   * @param method The file information method to run.
+   * @param timeOutMs Time out for the operation in milliseconds.
+   * @return The first file info if it's a file, the directory if it's
+   *         everywhere.
+   * @throws IOException If all the locations throw an exception.
+   */
+  private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+      final RemoteMethod method, long timeOutMs) throws IOException {
 
     // Get the file info from everybody
     Map<RemoteLocation, HdfsFileStatus> results =
-        rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
+        rpcClient.invokeConcurrent(locations, method, false, false, timeOutMs,
+            HdfsFileStatus.class);
     int children = 0;
     // We return the first file
     HdfsFileStatus dirStatus = null;
@@ -1756,9 +1866,10 @@ public class RouterClientProtocol implements ClientProtocol {
         MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
         MountTable entry = mountTable.getMountPoint(mName);
         if (entry != null) {
-          HdfsFileStatus fInfo = getFileInfoAll(entry.getDestinations(),
-              new RemoteMethod("getFileInfo", new Class<?>[] {String.class},
-                  new RemoteParam()));
+          RemoteMethod method = new RemoteMethod("getFileInfo",
+              new Class<?>[] {String.class}, new RemoteParam());
+          HdfsFileStatus fInfo = getFileInfoAll(
+              entry.getDestinations(), method, mountStatusTimeOut);
           if (fInfo != null) {
             permission = fInfo.getPermission();
             owner = fInfo.getOwner();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 3d80c41..730952b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -18,11 +18,15 @@
 
 package org.apache.hadoop.hdfs.server.federation.router;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -62,6 +66,7 @@ import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,7 +131,8 @@ public class RouterRpcClient {
 
     this.namenodeResolver = resolver;
 
-    this.connectionManager = new ConnectionManager(conf);
+    Configuration clientConf = getClientConfiguration(conf);
+    this.connectionManager = new ConnectionManager(clientConf);
     this.connectionManager.start();
 
     int numThreads = conf.getInt(
@@ -166,6 +172,31 @@ public class RouterRpcClient {
   }
 
   /**
+   * Get the configuration for the RPC client. It takes the Router
+   * configuration and transforms it into regular RPC Client configuration.
+   * @param conf Input configuration.
+   * @return Configuration for the RPC client.
+   */
+  private Configuration getClientConfiguration(final Configuration conf) {
+    Configuration clientConf = new Configuration(conf);
+    int maxRetries = conf.getInt(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT,
+        RBFConfigKeys.DFS_ROUTER_CLIENT_MAX_RETRIES_TIME_OUT_DEFAULT);
+    if (maxRetries >= 0) {
+      clientConf.setInt(
+          IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY, maxRetries);
+    }
+    long connectTimeOut = conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT,
+        RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    if (connectTimeOut >= 0) {
+      clientConf.setLong(IPC_CLIENT_CONNECT_TIMEOUT_KEY, connectTimeOut);
+    }
+    return clientConf;
+  }
+
+  /**
    * Get the active namenode resolver used by this client.
    * @return Active namenode resolver.
    */
@@ -341,17 +372,19 @@ public class RouterRpcClient {
    * @param method Remote ClientProtcol method to invoke.
    * @param params Variable list of parameters matching the method.
    * @return The result of invoking the method.
-   * @throws IOException
+   * @throws ConnectException If it cannot connect to any Namenode.
+   * @throws StandbyException If all Namenodes are in Standby.
+   * @throws IOException If it cannot invoke the method.
    */
   private Object invokeMethod(
       final UserGroupInformation ugi,
       final List<? extends FederationNamenodeContext> namenodes,
       final Class<?> protocol, final Method method, final Object... params)
-          throws IOException {
+          throws ConnectException, StandbyException, IOException {
 
     if (namenodes == null || namenodes.isEmpty()) {
       throw new IOException("No namenodes to invoke " + method.getName() +
-          " with params " + Arrays.toString(params) + " from "
+          " with params " + Arrays.deepToString(params) + " from "
           + router.getRouterId());
     }
 
@@ -388,6 +421,12 @@ public class RouterRpcClient {
             this.rpcMonitor.proxyOpFailureStandby();
           }
           failover = true;
+        } else if (ioe instanceof ConnectException ||
+            ioe instanceof ConnectTimeoutException) {
+          if (this.rpcMonitor != null) {
+            this.rpcMonitor.proxyOpFailureCommunicate();
+          }
+          failover = true;
         } else if (ioe instanceof RemoteException) {
           if (this.rpcMonitor != null) {
             this.rpcMonitor.proxyOpComplete(true);
@@ -408,7 +447,7 @@ public class RouterRpcClient {
           if (this.rpcMonitor != null) {
             this.rpcMonitor.proxyOpNoNamenodes();
           }
-          LOG.error("Can not get available namenode for {} {} error: {}",
+          LOG.error("Cannot get available namenode for {} {} error: {}",
               nsId, rpcAddress, ioe.getMessage());
           // Throw RetriableException so that client can retry
           throw new RetriableException(ioe);
@@ -433,24 +472,33 @@ public class RouterRpcClient {
 
     // All namenodes were unavailable or in standby
     String msg = "No namenode available to invoke " + method.getName() + " " +
-        Arrays.toString(params);
+        Arrays.deepToString(params) + " in " + namenodes + " from " +
+        router.getRouterId();
     LOG.error(msg);
+    int exConnect = 0;
     for (Entry<FederationNamenodeContext, IOException> entry :
         ioes.entrySet()) {
       FederationNamenodeContext namenode = entry.getKey();
-      String nsId = namenode.getNameserviceId();
-      String nnId = namenode.getNamenodeId();
+      String nnKey = namenode.getNamenodeKey();
       String addr = namenode.getRpcAddress();
       IOException ioe = entry.getValue();
       if (ioe instanceof StandbyException) {
-        LOG.error("{} {} at {} is in Standby: {}", nsId, nnId, addr,
-            ioe.getMessage());
+        LOG.error("{} at {} is in Standby: {}",
+            nnKey, addr, ioe.getMessage());
+      } else if (ioe instanceof ConnectException ||
+          ioe instanceof ConnectTimeoutException) {
+        exConnect++;
+        LOG.error("{} at {} cannot be reached: {}",
+            nnKey, addr, ioe.getMessage());
       } else {
-        LOG.error("{} {} at {} error: \"{}\"",
-            nsId, nnId, addr, ioe.getMessage());
+        LOG.error("{} at {} error: \"{}\"", nnKey, addr, ioe.getMessage());
       }
     }
-    throw new StandbyException(msg);
+    if (exConnect == ioes.size()) {
+      throw new ConnectException(msg);
+    } else {
+      throw new StandbyException(msg);
+    }
   }
 
   /**
@@ -497,6 +545,9 @@ public class RouterRpcClient {
           // failover, invoker looks for standby exceptions for failover.
           if (ioe instanceof StandbyException) {
             throw ioe;
+          } else if (ioe instanceof ConnectException ||
+              ioe instanceof ConnectTimeoutException) {
+            throw ioe;
           } else {
             throw new StandbyException(ioe.getMessage());
           }
@@ -1043,7 +1094,7 @@ public class RouterRpcClient {
 
     if (locations.isEmpty()) {
       throw new IOException("No remote locations available");
-    } else if (locations.size() == 1) {
+    } else if (locations.size() == 1 && timeOutMs <= 0) {
       // Shortcut, just one call
       T location = locations.iterator().next();
       String ns = location.getNameserviceId();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 739a2ff..b934355 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -30,6 +30,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Array;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -133,6 +134,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
@@ -294,7 +296,9 @@ public class RouterRpcServer extends AbstractService
         AccessControlException.class,
         LeaseExpiredException.class,
         NotReplicatedYetException.class,
-        IOException.class);
+        IOException.class,
+        ConnectException.class,
+        RetriableException.class);
 
     this.rpcServer.addSuppressedLoggingExceptions(
         StandbyException.class);
@@ -520,7 +524,7 @@ public class RouterRpcServer extends AbstractService
     // If default Ns is not present return result from first namespace.
     Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
     if (nss.isEmpty()) {
-      throw new IOException("No namespace availaible.");
+      throw new IOException("No namespace available.");
     }
     nsId = nss.iterator().next().getNameserviceId();
     return rpcClient.invokeSingle(nsId, method, clazz);
@@ -566,6 +570,7 @@ public class RouterRpcServer extends AbstractService
         replication, blockSize, supportedVersions, ecPolicyName, storagePolicy);
   }
 
+
   /**
    * Get the location to create a file. It checks if the file already existed
    * in one of the locations.
@@ -574,10 +579,24 @@ public class RouterRpcServer extends AbstractService
    * @return The remote location for this file.
    * @throws IOException If the file has no creation location.
    */
-  RemoteLocation getCreateLocation(final String src)
+  RemoteLocation getCreateLocation(final String src) throws IOException {
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    return getCreateLocation(src, locations);
+  }
+
+  /**
+   * Get the location to create a file. It checks if the file already existed
+   * in one of the locations.
+   *
+   * @param src Path of the file to check.
+   * @param locations Prefetched locations for the file.
+   * @return The remote location for this file.
+   * @throws IOException If the file has no creation location.
+   */
+  RemoteLocation getCreateLocation(
+      final String src, final List<RemoteLocation> locations)
       throws IOException {
 
-    final List<RemoteLocation> locations = getLocationsForPath(src, true);
     if (locations == null || locations.isEmpty()) {
       throw new IOException("Cannot get locations to create " + src);
     }
@@ -1569,6 +1588,27 @@ public class RouterRpcServer extends AbstractService
   }
 
   /**
+   * Check if a path supports failed subclusters.
+   *
+   * @param path Path to check.
+   * @return If a path should support failed subclusters.
+   */
+  boolean isPathFaultTolerant(final String path) {
+    if (subclusterResolver instanceof MountTableResolver) {
+      try {
+        MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
+        MountTable entry = mountTable.getMountPoint(path);
+        if (entry != null) {
+          return entry.isFaultTolerant();
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot get mount point", e);
+      }
+    }
+    return false;
+  }
+
+  /**
    * Check if call needs to be invoked to all the locations. The call is
    * supposed to be invoked in all the locations in case the order of the mount
    * entry is amongst HASH_ALL, RANDOM or SPACE or if the source is itself a
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
index d5e1857..8761038 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/impl/MountTableStoreImpl.java
@@ -66,6 +66,7 @@ public class MountTableStoreImpl extends MountTableStore {
       if (pc != null) {
         pc.checkPermission(mountTable, FsAction.WRITE);
       }
+      mountTable.validate();
     }
 
     boolean status = getDriver().put(mountTable, false, true);
@@ -85,6 +86,7 @@ public class MountTableStoreImpl extends MountTableStore {
       if (pc != null) {
         pc.checkPermission(mountTable, FsAction.WRITE);
       }
+      mountTable.validate();
     }
 
     boolean status = getDriver().put(mountTable, true, true);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
index c1585b0..d1351a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/MountTable.java
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -59,6 +60,10 @@ public abstract class MountTable extends BaseRecord {
       "Invalid entry, invalid destination path ";
   public static final String ERROR_MSG_ALL_DEST_MUST_START_WITH_BACK_SLASH =
       "Invalid entry, all destination must start with / ";
+  private static final String ERROR_MSG_FAULT_TOLERANT_MULTI_DEST =
+      "Invalid entry, fault tolerance requires multiple destinations ";
+  private static final String ERROR_MSG_FAULT_TOLERANT_ALL =
+      "Invalid entry, fault tolerance only supported for ALL order ";
 
   /** Comparator for paths which considers the /. */
   public static final Comparator<String> PATH_COMPARATOR =
@@ -229,6 +234,20 @@ public abstract class MountTable extends BaseRecord {
   public abstract void setDestOrder(DestinationOrder order);
 
   /**
+   * Check if the mount point supports a failed destination.
+   *
+   * @return If it supports failures.
+   */
+  public abstract boolean isFaultTolerant();
+
+  /**
+   * Set if the mount point supports failed destinations.
+   *
+   * @param faultTolerant If it supports failures.
+   */
+  public abstract void setFaultTolerant(boolean faultTolerant);
+
+  /**
    * Get owner name of this mount table entry.
    *
    * @return Owner name
@@ -321,11 +340,14 @@ public abstract class MountTable extends BaseRecord {
     List<RemoteLocation> destinations = this.getDestinations();
     sb.append(destinations);
     if (destinations != null && destinations.size() > 1) {
-      sb.append("[" + this.getDestOrder() + "]");
+      sb.append("[").append(this.getDestOrder()).append("]");
     }
     if (this.isReadOnly()) {
       sb.append("[RO]");
     }
+    if (this.isFaultTolerant()) {
+      sb.append("[FT]");
+    }
 
     if (this.getOwnerName() != null) {
       sb.append("[owner:").append(this.getOwnerName()).append("]");
@@ -383,6 +405,16 @@ public abstract class MountTable extends BaseRecord {
             ERROR_MSG_ALL_DEST_MUST_START_WITH_BACK_SLASH + this);
       }
     }
+    if (isFaultTolerant()) {
+      if (getDestinations().size() < 2) {
+        throw new IllegalArgumentException(
+            ERROR_MSG_FAULT_TOLERANT_MULTI_DEST + this);
+      }
+      if (!isAll()) {
+        throw new IllegalArgumentException(
+            ERROR_MSG_FAULT_TOLERANT_ALL + this);
+      }
+    }
   }
 
   @Override
@@ -397,6 +429,7 @@ public abstract class MountTable extends BaseRecord {
         .append(this.getDestinations())
         .append(this.isReadOnly())
         .append(this.getDestOrder())
+        .append(this.isFaultTolerant())
         .toHashCode();
   }
 
@@ -404,16 +437,13 @@ public abstract class MountTable extends BaseRecord {
   public boolean equals(Object obj) {
     if (obj instanceof MountTable) {
       MountTable other = (MountTable)obj;
-      if (!this.getSourcePath().equals(other.getSourcePath())) {
-        return false;
-      } else if (!this.getDestinations().equals(other.getDestinations())) {
-        return false;
-      } else if (this.isReadOnly() != other.isReadOnly()) {
-        return false;
-      } else if (!this.getDestOrder().equals(other.getDestOrder())) {
-        return false;
-      }
-      return true;
+      return new EqualsBuilder()
+          .append(this.getSourcePath(), other.getSourcePath())
+          .append(this.getDestinations(), other.getDestinations())
+          .append(this.isReadOnly(), other.isReadOnly())
+          .append(this.getDestOrder(), other.getDestOrder())
+          .append(this.isFaultTolerant(), other.isFaultTolerant())
+          .isEquals();
     }
     return false;
   }
@@ -424,9 +454,7 @@ public abstract class MountTable extends BaseRecord {
    */
   public boolean isAll() {
     DestinationOrder order = getDestOrder();
-    return order == DestinationOrder.HASH_ALL ||
-        order == DestinationOrder.RANDOM ||
-        order == DestinationOrder.SPACE;
+    return DestinationOrder.FOLDER_ALL.contains(order);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
index 4c7622c..62cdc72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/records/impl/pb/MountTablePBImpl.java
@@ -196,6 +196,20 @@ public class MountTablePBImpl extends MountTable implements PBRecord {
   }
 
   @Override
+  public boolean isFaultTolerant() {
+    MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
+    if (!proto.hasFaultTolerant()) {
+      return false;
+    }
+    return proto.getFaultTolerant();
+  }
+
+  @Override
+  public void setFaultTolerant(boolean faultTolerant) {
+    this.translator.getBuilder().setFaultTolerant(faultTolerant);
+  }
+
+  @Override
   public String getOwnerName() {
     MountTableRecordProtoOrBuilder proto = this.translator.getProtoOrBuilder();
     if (!proto.hasOwnerName()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
index b04b069..61da7e9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
@@ -135,12 +135,12 @@ public class RouterAdmin extends Configured implements Tool {
     }
     if (cmd.equals("-add")) {
       return "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
-          + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+          + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
           + "-owner <owner> -group <group> -mode <mode>]";
     } else if (cmd.equals("-update")) {
       return "\t[-update <source> <nameservice1, nameservice2, ...> "
           + "<destination> "
-          + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+          + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
           + "-owner <owner> -group <group> -mode <mode>]";
     } else if (cmd.equals("-rm")) {
       return "\t[-rm <source>]";
@@ -415,6 +415,7 @@ public class RouterAdmin extends Configured implements Tool {
 
     // Optional parameters
     boolean readOnly = false;
+    boolean faultTolerant = false;
     String owner = null;
     String group = null;
     FsPermission mode = null;
@@ -422,6 +423,8 @@ public class RouterAdmin extends Configured implements Tool {
     while (i < parameters.length) {
       if (parameters[i].equals("-readonly")) {
         readOnly = true;
+      } else if (parameters[i].equals("-faulttolerant")) {
+        faultTolerant = true;
       } else if (parameters[i].equals("-order")) {
         i++;
         try {
@@ -447,7 +450,7 @@ public class RouterAdmin extends Configured implements Tool {
       i++;
     }
 
-    return addMount(mount, nss, dest, readOnly, order,
+    return addMount(mount, nss, dest, readOnly, faultTolerant, order,
         new ACLEntity(owner, group, mode));
   }
 
@@ -464,7 +467,8 @@ public class RouterAdmin extends Configured implements Tool {
    * @throws IOException Error adding the mount point.
    */
   public boolean addMount(String mount, String[] nss, String dest,
-      boolean readonly, DestinationOrder order, ACLEntity aclInfo)
+      boolean readonly, boolean faultTolerant, DestinationOrder order,
+      ACLEntity aclInfo)
       throws IOException {
     mount = normalizeFileSystemPath(mount);
     // Get the existing entry
@@ -491,6 +495,9 @@ public class RouterAdmin extends Configured implements Tool {
       if (readonly) {
         newEntry.setReadOnly(true);
       }
+      if (faultTolerant) {
+        newEntry.setFaultTolerant(true);
+      }
       if (order != null) {
         newEntry.setDestOrder(order);
       }
@@ -508,6 +515,8 @@ public class RouterAdmin extends Configured implements Tool {
         newEntry.setMode(aclInfo.getMode());
       }
 
+      newEntry.validate();
+
       AddMountTableEntryRequest request =
           AddMountTableEntryRequest.newInstance(newEntry);
       AddMountTableEntryResponse addResponse =
@@ -527,6 +536,9 @@ public class RouterAdmin extends Configured implements Tool {
       if (readonly) {
         existingEntry.setReadOnly(true);
       }
+      if (faultTolerant) {
+        existingEntry.setFaultTolerant(true);
+      }
       if (order != null) {
         existingEntry.setDestOrder(order);
       }
@@ -544,6 +556,8 @@ public class RouterAdmin extends Configured implements Tool {
         existingEntry.setMode(aclInfo.getMode());
       }
 
+      existingEntry.validate();
+
       UpdateMountTableEntryRequest updateRequest =
           UpdateMountTableEntryRequest.newInstance(existingEntry);
       UpdateMountTableEntryResponse updateResponse =
@@ -572,6 +586,7 @@ public class RouterAdmin extends Configured implements Tool {
 
     // Optional parameters
     boolean readOnly = false;
+    boolean faultTolerant = false;
     String owner = null;
     String group = null;
     FsPermission mode = null;
@@ -579,6 +594,8 @@ public class RouterAdmin extends Configured implements Tool {
     while (i < parameters.length) {
       if (parameters[i].equals("-readonly")) {
         readOnly = true;
+      } else if (parameters[i].equals("-faulttolerant")) {
+        faultTolerant = true;
       } else if (parameters[i].equals("-order")) {
         i++;
         try {
@@ -604,7 +621,7 @@ public class RouterAdmin extends Configured implements Tool {
       i++;
     }
 
-    return updateMount(mount, nss, dest, readOnly, order,
+    return updateMount(mount, nss, dest, readOnly, faultTolerant, order,
         new ACLEntity(owner, group, mode));
   }
 
@@ -621,7 +638,8 @@ public class RouterAdmin extends Configured implements Tool {
    * @throws IOException Error updating the mount point.
    */
   public boolean updateMount(String mount, String[] nss, String dest,
-      boolean readonly, DestinationOrder order, ACLEntity aclInfo)
+      boolean readonly, boolean faultTolerant,
+      DestinationOrder order, ACLEntity aclInfo)
       throws IOException {
     mount = normalizeFileSystemPath(mount);
     MountTableManager mountTable = client.getMountTableManager();
@@ -634,6 +652,7 @@ public class RouterAdmin extends Configured implements Tool {
     MountTable newEntry = MountTable.newInstance(mount, destMap);
 
     newEntry.setReadOnly(readonly);
+    newEntry.setFaultTolerant(faultTolerant);
 
     if (order != null) {
       newEntry.setDestOrder(order);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
index a55be73..6a60e4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/proto/FederationProtocol.proto
@@ -143,6 +143,8 @@ message MountTableRecordProto {
   optional int32 mode = 12;
 
   optional QuotaUsageProto quota = 13;
+
+  optional bool faultTolerant = 14 [default = false];
 }
 
 message AddMountTableEntryRequestProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 1034c87..e23f863 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -504,6 +504,36 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.client.mount-status.time-out</name>
+    <value>1s</value>
+    <description>
+      Set a timeout for the Router when listing folders containing mount
+      points. In this process, the Router checks the mount table and then it
+      checks permissions in the subcluster. After the time out, we return the
+      default values.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.connect.max.retries.on.timeouts</name>
+    <value>0</value>
+    <description>
+      Maximum number of retries for the IPC Client when connecting to the
+      subclusters. By default, it doesn't let the IPC retry and the Router
+      handles it.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.federation.router.connect.timeout</name>
+    <value>2s</value>
+    <description>
+      Time out for the IPC client connecting to the subclusters. This should be
+      short as the Router has knowledge of the state of the Routers.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.keytab.file</name>
     <value></value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html
index c591698..cf8653b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.html
@@ -393,6 +393,7 @@
       <th>Target path</th>
       <th>Order</th>
       <th>Read only</th>
+      <th>Fault tolerant</th>
       <th>Owner</th>
       <th>Group</th>
       <th>Permission</th>
@@ -409,6 +410,7 @@
       <td>{path}</td>
       <td>{order}</td>
       <td align="center" class="mount-table-icon mount-table-read-only-{readonly}" title="{status}"/>
+      <td align="center" class="mount-table-icon mount-table-fault-tolerant-{faulttolerant}" title="{ftStatus}"></td>
       <td>{ownerName}</td>
       <td>{groupName}</td>
       <td>{mode}</td>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js
index 5da7b07..e655e60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/router/federationhealth.js
@@ -324,8 +324,20 @@
         }
       }
 
+      function augment_fault_tolerant(mountTable) {
+        for (var i = 0, e = mountTable.length; i < e; ++i) {
+          if (mountTable[i].faulttolerant == true) {
+            mountTable[i].faulttolerant = "true"
+            mountTable[i].ftStatus = "Fault tolerant"
+          } else {
+            mountTable[i].faulttolerant = "false"
+          }
+        }
+      }
+
       resource.MountTable = JSON.parse(resource.MountTable)
       augment_read_only(resource.MountTable)
+      augment_fault_tolerant(resource.MountTable)
       return resource;
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css
index 5cdd826..b2eef6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/webapps/static/rbf.css
@@ -135,3 +135,8 @@
     color: #5fa341;
     content: "\e033";
 }
+
+.mount-table-fault-tolerant-true:before {
+    color: #5fa341;
+    content: "\e033";
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index f24ff12..83cecda 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -266,6 +266,14 @@ To determine which subcluster contains a file:
     [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -getDestination /user/user1/file.txt
 
 Note that consistency of the data across subclusters is not guaranteed by the Router.
+By default, if one subcluster is unavailable, writes may fail if they target that subcluster.
+To allow writing in another subcluster, one can make the mount point fault tolerant:
+
+    [hdfs]$ $HADOOP_HOME/bin/hdfs dfsrouteradmin -add /data ns1,ns2 /data -order HASH_ALL -faulttolerant
+
+Note that this can lead to a file to be written in multiple subclusters or a folder missing in one.
+One needs to be aware of the possibility of these inconsistencies and target this `faulttolerant` approach to resilient paths.
+An example for this is the `/app-logs` folder which will mostly write once into a subfolder.
 
 ### Disabling nameservices
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
index 9b58fff..d8dffee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
@@ -18,19 +18,44 @@
 package org.apache.hadoop.hdfs.server.federation;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
 import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
@@ -40,15 +65,29 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
+import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DataChecksum.Type;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.BlockingService;
 
@@ -59,9 +98,15 @@ import com.google.protobuf.BlockingService;
  */
 public class MockNamenode {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MockNamenode.class);
+
+
   /** Mock implementation of the Namenode. */
   private final NamenodeProtocols mockNn;
 
+  /** Name service identifier (subcluster). */
+  private String nsId;
   /** HA state of the Namenode. */
   private HAServiceState haState = HAServiceState.STANDBY;
 
@@ -71,9 +116,13 @@ public class MockNamenode {
   private HttpServer2 httpServer;
 
 
-  public MockNamenode() throws Exception {
-    Configuration conf = new Configuration();
+  public MockNamenode(final String nsIdentifier) throws IOException {
+    this(nsIdentifier, new HdfsConfiguration());
+  }
 
+  public MockNamenode(final String nsIdentifier, final Configuration conf)
+      throws IOException {
+    this.nsId = nsIdentifier;
     this.mockNn = mock(NamenodeProtocols.class);
     setupMock();
     setupRPCServer(conf);
@@ -86,7 +135,7 @@ public class MockNamenode {
    * @throws IOException If the mock cannot be setup.
    */
   protected void setupMock() throws IOException {
-    NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1);
+    NamespaceInfo nsInfo = new NamespaceInfo(1, this.nsId, this.nsId, 1);
     when(mockNn.versionRequest()).thenReturn(nsInfo);
 
     when(mockNn.getServiceStatus()).thenAnswer(new Answer<HAServiceStatus>() {
@@ -115,11 +164,16 @@ public class MockNamenode {
         ClientNamenodeProtocol.newReflectiveBlockingService(
             clientNNProtoXlator);
 
+    int numHandlers = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY,
+        DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
+
     rpcServer = new RPC.Builder(conf)
         .setProtocol(ClientNamenodeProtocolPB.class)
         .setInstance(clientNNPbService)
         .setBindAddress("0.0.0.0")
         .setPort(0)
+        .setNumHandlers(numHandlers)
         .build();
 
     NamenodeProtocolServerSideTranslatorPB nnProtoXlator =
@@ -146,6 +200,18 @@ public class MockNamenode {
     DFSUtil.addPBProtocol(
         conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
 
+    this.rpcServer.addTerseExceptions(
+        RemoteException.class,
+        SafeModeException.class,
+        FileNotFoundException.class,
+        FileAlreadyExistsException.class,
+        AccessControlException.class,
+        LeaseExpiredException.class,
+        NotReplicatedYetException.class,
+        IOException.class,
+        ConnectException.class,
+        StandbyException.class);
+
     rpcServer.start();
   }
 
@@ -189,6 +255,14 @@ public class MockNamenode {
   }
 
   /**
+   * Get the name service id (subcluster) of the Mock Namenode.
+   * @return Name service identifier.
+   */
+  public String getNameserviceId() {
+    return nsId;
+  }
+
+  /**
    * Get the HA state of the Mock Namenode.
    * @return HA state (ACTIVE or STANDBY).
    */
@@ -217,9 +291,158 @@ public class MockNamenode {
   public void stop() throws Exception {
     if (rpcServer != null) {
       rpcServer.stop();
+      rpcServer = null;
     }
     if (httpServer != null) {
       httpServer.stop();
+      httpServer = null;
     }
   }
+
+  /**
+   * Add the mock for the FileSystem calls in ClientProtocol.
+   * @throws IOException If it cannot be setup.
+   */
+  public void addFileSystemMock() throws IOException {
+    final SortedMap<String, String> fs =
+        new ConcurrentSkipListMap<String, String>();
+
+    DirectoryListing l = mockNn.getListing(anyString(), any(), anyBoolean());
+    when(l).thenAnswer(invocation -> {
+      String src = getSrc(invocation);
+      LOG.info("{} getListing({})", nsId, src);
+      if (!src.endsWith("/")) {
+        src += "/";
+      }
+      Map<String, String> files =
+          fs.subMap(src, src + Character.MAX_VALUE);
+      List<HdfsFileStatus> list = new ArrayList<>();
+      for (String file : files.keySet()) {
+        if (file.substring(src.length()).indexOf('/') < 0) {
+          HdfsFileStatus fileStatus =
+              getMockHdfsFileStatus(file, fs.get(file));
+          list.add(fileStatus);
+        }
+      }
+      HdfsFileStatus[] array = list.toArray(
+          new HdfsFileStatus[list.size()]);
+      return new DirectoryListing(array, 0);
+    });
+    when(mockNn.getFileInfo(anyString())).thenAnswer(invocation -> {
+      String src = getSrc(invocation);
+      LOG.info("{} getFileInfo({})", nsId, src);
+      return getMockHdfsFileStatus(src, fs.get(src));
+    });
+    HdfsFileStatus c = mockNn.create(anyString(), any(), anyString(), any(),
+        anyBoolean(), anyShort(), anyLong(), any(), any(), any());
+    when(c).thenAnswer(invocation -> {
+      String src = getSrc(invocation);
+      LOG.info("{} create({})", nsId, src);
+      fs.put(src, "FILE");
+      return getMockHdfsFileStatus(src, "FILE");
+    });
+    LocatedBlocks b = mockNn.getBlockLocations(
+        anyString(), anyLong(), anyLong());
+    when(b).thenAnswer(invocation -> {
+      String src = getSrc(invocation);
+      LOG.info("{} getBlockLocations({})", nsId, src);
+      if (!fs.containsKey(src)) {
+        LOG.error("{} cannot find {} for getBlockLocations", nsId, src);
+        throw new FileNotFoundException("File does not exist " + src);
+      }
+      return mock(LocatedBlocks.class);
+    });
+    boolean f = mockNn.complete(anyString(), anyString(), any(), anyLong());
+    when(f).thenAnswer(invocation -> {
+      String src = getSrc(invocation);
+      if (!fs.containsKey(src)) {
+        LOG.error("{} cannot find {} for complete", nsId, src);
+        throw new FileNotFoundException("File does not exist " + src);
+      }
+      return true;
+    });
+    LocatedBlock a = mockNn.addBlock(
+        anyString(), anyString(), any(), any(), anyLong(), any(), any());
+    when(a).thenAnswer(invocation -> {
+      String src = getSrc(invocation);
+      if (!fs.containsKey(src)) {
+        LOG.error("{} cannot find {} for addBlock", nsId, src);
+        throw new FileNotFoundException("File does not exist " + src);
+      }
+      return getMockLocatedBlock(nsId);
+    });
+    boolean m = mockNn.mkdirs(anyString(), any(), anyBoolean());
+    when(m).thenAnswer(invocation -> {
+      String src = getSrc(invocation);
+      LOG.info("{} mkdirs({})", nsId, src);
+      fs.put(src, "DIRECTORY");
+      return true;
+    });
+    when(mockNn.getServerDefaults()).thenAnswer(invocation -> {
+      LOG.info("{} getServerDefaults", nsId);
+      FsServerDefaults defaults = mock(FsServerDefaults.class);
+      when(defaults.getChecksumType()).thenReturn(
+          Type.valueOf(DataChecksum.CHECKSUM_CRC32));
+      when(defaults.getKeyProviderUri()).thenReturn(nsId);
+      return defaults;
+    });
+  }
+
+  private static String getSrc(InvocationOnMock invocation) {
+    return (String) invocation.getArguments()[0];
+  }
+
+  /**
+   * Get a mock HDFS file status.
+   * @param filename Name of the file.
+   * @param type Type of the file (FILE, DIRECTORY, or null).
+   * @return HDFS file status
+   */
+  private static HdfsFileStatus getMockHdfsFileStatus(
+      final String filename, final String type) {
+    if (type == null) {
+      return null;
+    }
+    HdfsFileStatus fileStatus = mock(HdfsFileStatus.class);
+    when(fileStatus.getLocalNameInBytes()).thenReturn(filename.getBytes());
+    when(fileStatus.getPermission()).thenReturn(mock(FsPermission.class));
+    when(fileStatus.getOwner()).thenReturn("owner");
+    when(fileStatus.getGroup()).thenReturn("group");
+    if (type.equals("FILE")) {
+      when(fileStatus.getLen()).thenReturn(100L);
+      when(fileStatus.getReplication()).thenReturn((short) 1);
+      when(fileStatus.getBlockSize()).thenReturn(
+          HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+    } else if (type.equals("DIRECTORY")) {
+      when(fileStatus.isDir()).thenReturn(true);
+      when(fileStatus.isDirectory()).thenReturn(true);
+    }
+    return fileStatus;
+  }
+
+  /**
+   * Get a mock located block pointing to one of the subclusters. It is
+   * allocated in a fake Datanode.
+   * @param nsId Name service identifier (subcluster).
+   * @return Mock located block.
+   */
+  private static LocatedBlock getMockLocatedBlock(final String nsId) {
+    LocatedBlock lb = mock(LocatedBlock.class);
+    when(lb.getCachedLocations()).thenReturn(new DatanodeInfo[0]);
+    DatanodeID nodeId = new DatanodeID("localhost", "localhost", "dn0",
+        1111, 1112, 1113, 1114);
+    DatanodeInfo dnInfo = new DatanodeDescriptor(nodeId);
+    when(lb.getLocations()).thenReturn(new DatanodeInfo[] {dnInfo});
+    ExtendedBlock eb = mock(ExtendedBlock.class);
+    when(eb.getBlockPoolId()).thenReturn(nsId);
+    when(lb.getBlock()).thenReturn(eb);
+    @SuppressWarnings("unchecked")
+    Token<BlockTokenIdentifier> tok = mock(Token.class);
+    when(tok.getIdentifier()).thenReturn(nsId.getBytes());
+    when(tok.getPassword()).thenReturn(nsId.getBytes());
+    when(tok.getKind()).thenReturn(new Text(nsId));
+    when(tok.getService()).thenReturn(new Text(nsId));
+    when(lb.getBlockToken()).thenReturn(tok);
+    return lb;
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
index 486d4a0..381203b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -152,7 +152,7 @@ public class TestRouterAdminCLI {
 
   @Test
   public void testAddMountTable() throws Exception {
-    String nsId = "ns0";
+    String nsId = "ns0,ns1";
     String src = "/test-addmounttable";
     String dest = "/addmounttable";
     String[] argv = new String[] {"-add", src, nsId, dest};
@@ -166,26 +166,35 @@ public class TestRouterAdminCLI {
     MountTable mountTable = getResponse.getEntries().get(0);
 
     List<RemoteLocation> destinations = mountTable.getDestinations();
-    assertEquals(1, destinations.size());
+    assertEquals(2, destinations.size());
 
     assertEquals(src, mountTable.getSourcePath());
-    assertEquals(nsId, destinations.get(0).getNameserviceId());
+    assertEquals("ns0", destinations.get(0).getNameserviceId());
     assertEquals(dest, destinations.get(0).getDest());
+    assertEquals("ns1", destinations.get(1).getNameserviceId());
+    assertEquals(dest, destinations.get(1).getDest());
     assertFalse(mountTable.isReadOnly());
+    assertFalse(mountTable.isFaultTolerant());
 
     // test mount table update behavior
     dest = dest + "-new";
-    argv = new String[] {"-add", src, nsId, dest, "-readonly"};
+    argv = new String[] {"-add", src, nsId, dest, "-readonly",
+        "-faulttolerant", "-order", "HASH_ALL"};
     assertEquals(0, ToolRunner.run(admin, argv));
     stateStore.loadCache(MountTableStoreImpl.class, true);
 
     getResponse = client.getMountTableManager()
         .getMountTableEntries(getRequest);
     mountTable = getResponse.getEntries().get(0);
-    assertEquals(2, mountTable.getDestinations().size());
-    assertEquals(nsId, mountTable.getDestinations().get(1).getNameserviceId());
-    assertEquals(dest, mountTable.getDestinations().get(1).getDest());
+    assertEquals(4, mountTable.getDestinations().size());
+    RemoteLocation loc2 = mountTable.getDestinations().get(2);
+    assertEquals("ns0", loc2.getNameserviceId());
+    assertEquals(dest, loc2.getDest());
+    RemoteLocation loc3 = mountTable.getDestinations().get(3);
+    assertEquals("ns1", loc3.getNameserviceId());
+    assertEquals(dest, loc3.getDest());
     assertTrue(mountTable.isReadOnly());
+    assertTrue(mountTable.isFaultTolerant());
   }
 
   @Test
@@ -211,6 +220,7 @@ public class TestRouterAdminCLI {
     assertEquals(nsId, destinations.get(0).getNameserviceId());
     assertEquals(dest, destinations.get(0).getDest());
     assertFalse(mountTable.isReadOnly());
+    assertFalse(mountTable.isFaultTolerant());
 
     // test mount table update behavior
     dest = dest + "-new";
@@ -516,17 +526,19 @@ public class TestRouterAdminCLI {
     System.setOut(new PrintStream(out));
     String[] argv = new String[] {"-add", src, nsId};
     assertEquals(-1, ToolRunner.run(admin, argv));
-    assertTrue(out.toString().contains(
+    assertTrue("Wrong message: " + out, out.toString().contains(
         "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
-            + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+            + "[-readonly] [-faulttolerant] "
+            + "[-order HASH|LOCAL|RANDOM|HASH_ALL] "
             + "-owner <owner> -group <group> -mode <mode>]"));
     out.reset();
 
     argv = new String[] {"-update", src, nsId};
     assertEquals(-1, ToolRunner.run(admin, argv));
-    assertTrue(out.toString().contains(
+    assertTrue("Wrong message: " + out, out.toString().contains(
         "\t[-update <source> <nameservice1, nameservice2, ...> <destination> "
-            + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+            + "[-readonly] [-faulttolerant] "
+            + "[-order HASH|LOCAL|RANDOM|HASH_ALL] "
             + "-owner <owner> -group <group> -mode <mode>]"));
     out.reset();
 
@@ -567,10 +579,11 @@ public class TestRouterAdminCLI {
     assertEquals(-1, ToolRunner.run(admin, argv));
     String expected = "Usage: hdfs dfsrouteradmin :\n"
         + "\t[-add <source> <nameservice1, nameservice2, ...> <destination> "
-        + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+        + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
         + "-owner <owner> -group <group> -mode <mode>]\n"
         + "\t[-update <source> <nameservice1, nameservice2, ...> "
-        + "<destination> " + "[-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
+        + "<destination> "
+        + "[-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] "
         + "-owner <owner> -group <group> -mode <mode>]\n" + "\t[-rm <source>]\n"
         + "\t[-ls <path>]\n"
         + "\t[-getDestination <path>]\n"
@@ -579,7 +592,7 @@ public class TestRouterAdminCLI {
         + "\t[-safemode enter | leave | get]\n"
         + "\t[-nameservice enable | disable <nameservice>]\n"
         + "\t[-getDisabledNameservices]";
-    assertTrue(out.toString(), out.toString().contains(expected));
+    assertTrue("Wrong message: " + out, out.toString().contains(expected));
     out.reset();
   }
 
@@ -1159,4 +1172,28 @@ public class TestRouterAdminCLI {
     argv = new String[] {"-getDestination /file1.txt /file2.txt"};
     assertEquals(-1, ToolRunner.run(admin, argv));
   }
+
+  @Test
+  public void testErrorFaultTolerant() throws Exception {
+
+    System.setErr(new PrintStream(err));
+    String[] argv = new String[] {"-add", "/mntft", "ns01", "/tmp",
+        "-faulttolerant"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+    assertTrue(err.toString(), err.toString().contains(
+        "Invalid entry, fault tolerance requires multiple destinations"));
+    err.reset();
+
+    System.setErr(new PrintStream(err));
+    argv = new String[] {"-add", "/mntft", "ns0,ns1", "/tmp",
+        "-order", "HASH", "-faulttolerant"};
+    assertEquals(-1, ToolRunner.run(admin, argv));
+    assertTrue(err.toString(), err.toString().contains(
+        "Invalid entry, fault tolerance only supported for ALL order"));
+    err.reset();
+
+    argv = new String[] {"-add", "/mntft", "ns0,ns1", "/tmp",
+        "-order", "HASH_ALL", "-faulttolerant"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
new file mode 100644
index 0000000..c8f96c6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
@@ -0,0 +1,654 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static java.util.Arrays.asList;
+import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.server.federation.MockNamenode;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Test the handling of fault tolerant mount points in the Router.
+ */
+public class TestRouterFaultTolerant {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterFaultTolerant.class);
+
+  /** Number of files to create for testing. */
+  private static final int NUM_FILES = 10;
+  /** Number of Routers for test. */
+  private static final int NUM_ROUTERS = 2;
+
+
+  /** Namenodes for the test per name service id (subcluster). */
+  private Map<String, MockNamenode> namenodes = new HashMap<>();
+  /** Routers for the test. */
+  private List<Router> routers = new ArrayList<>();
+
+  /** Run test tasks in parallel. */
+  private ExecutorService service;
+
+
+  @Before
+  public void setup() throws Exception {
+    LOG.info("Start the Namenodes");
+    Configuration nnConf = new HdfsConfiguration();
+    nnConf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 10);
+    for (final String nsId : asList("ns0", "ns1")) {
+      MockNamenode nn = new MockNamenode(nsId, nnConf);
+      nn.transitionToActive();
+      nn.addFileSystemMock();
+      namenodes.put(nsId, nn);
+    }
+
+    LOG.info("Start the Routers");
+    Configuration routerConf = new RouterConfigBuilder()
+        .stateStore()
+        .admin()
+        .rpc()
+        .build();
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    routerConf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "0.0.0.0:0");
+    // Speedup time outs
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_CLIENT_CONNECT_TIMEOUT,
+        500, TimeUnit.MILLISECONDS);
+
+    Configuration stateStoreConf = getStateStoreConfiguration();
+    stateStoreConf.setClass(
+        RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
+    stateStoreConf.setClass(
+        RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        MultipleDestinationMountTableResolver.class,
+        FileSubclusterResolver.class);
+    routerConf.addResource(stateStoreConf);
+
+    for (int i = 0; i < NUM_ROUTERS; i++) {
+      // router0 doesn't allow partial listing
+      routerConf.setBoolean(
+          RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, i != 0);
+
+      final Router router = new Router();
+      router.init(routerConf);
+      router.start();
+      routers.add(router);
+    }
+
+    LOG.info("Registering the subclusters in the Routers");
+    registerSubclusters(Collections.singleton("ns1"));
+
+    LOG.info("Stop ns1 to simulate an unavailable subcluster");
+    namenodes.get("ns1").stop();
+
+    service = Executors.newFixedThreadPool(10);
+  }
+
+  /**
+   * Register the subclusters in all Routers.
+   * @param unavailableSubclusters Set of unavailable subclusters.
+   * @throws IOException If it cannot register a subcluster.
+   */
+  private void registerSubclusters(Set<String> unavailableSubclusters)
+      throws IOException {
+    for (final Router router : routers) {
+      MembershipNamenodeResolver resolver =
+          (MembershipNamenodeResolver) router.getNamenodeResolver();
+      for (final MockNamenode nn : namenodes.values()) {
+        String nsId = nn.getNameserviceId();
+        String rpcAddress = "localhost:" + nn.getRPCPort();
+        String httpAddress = "localhost:" + nn.getHTTPPort();
+        NamenodeStatusReport report = new NamenodeStatusReport(
+            nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress);
+        if (unavailableSubclusters.contains(nsId)) {
+          LOG.info("Register {} as UNAVAILABLE", nsId);
+          report.setRegistrationValid(false);
+        } else {
+          LOG.info("Register {} as ACTIVE", nsId);
+          report.setRegistrationValid(true);
+        }
+        report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0));
+        resolver.registerNamenode(report);
+      }
+      resolver.loadCache(true);
+    }
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    LOG.info("Stopping the cluster");
+    for (final MockNamenode nn : namenodes.values()) {
+      nn.stop();
+    }
+    namenodes.clear();
+
+    routers.forEach(router ->  router.stop());
+    routers.clear();
+
+    if (service != null) {
+      service.shutdown();
+      service = null;
+    }
+  }
+
+  /**
+   * Add a mount table entry in some name services and wait until it is
+   * available.
+   * @param mountPoint Name of the mount point.
+   * @param order Order of the mount table entry.
+   * @param nsIds Name service identifiers.
+   * @throws Exception If the entry could not be created.
+   */
+  private void createMountTableEntry(
+      final String mountPoint, final DestinationOrder order,
+      Collection<String> nsIds) throws Exception {
+    Router router = getRandomRouter();
+    RouterClient admin = getAdminClient(router);
+    MountTableManager mountTable = admin.getMountTableManager();
+    Map<String, String> destMap = new HashMap<>();
+    for (String nsId : nsIds) {
+      destMap.put(nsId, mountPoint);
+    }
+    MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
+    newEntry.setDestOrder(order);
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(newEntry);
+    AddMountTableEntryResponse addResponse =
+        mountTable.addMountTableEntry(addRequest);
+    boolean created = addResponse.getStatus();
+    assertTrue(created);
+
+    refreshRoutersCaches();
+
+    // Check for the path
+    GetMountTableEntriesRequest getRequest =
+        GetMountTableEntriesRequest.newInstance(mountPoint);
+    GetMountTableEntriesResponse getResponse =
+        mountTable.getMountTableEntries(getRequest);
+    List<MountTable> entries = getResponse.getEntries();
+    assertEquals("Too many entries: " + entries, 1, entries.size());
+    assertEquals(mountPoint, entries.get(0).getSourcePath());
+  }
+
+  /**
+   * Update a mount table entry to be fault tolerant.
+   * @param mountPoint Mount point to update.
+   * @throws IOException If it cannot update the mount point.
+   */
+  private void updateMountPointFaultTolerant(final String mountPoint)
+      throws IOException {
+    Router router = getRandomRouter();
+    RouterClient admin = getAdminClient(router);
+    MountTableManager mountTable = admin.getMountTableManager();
+    GetMountTableEntriesRequest getRequest =
+        GetMountTableEntriesRequest.newInstance(mountPoint);
+    GetMountTableEntriesResponse entries =
+        mountTable.getMountTableEntries(getRequest);
+    MountTable updateEntry = entries.getEntries().get(0);
+    updateEntry.setFaultTolerant(true);
+    UpdateMountTableEntryRequest updateRequest =
+        UpdateMountTableEntryRequest.newInstance(updateEntry);
+    UpdateMountTableEntryResponse updateResponse =
+        mountTable.updateMountTableEntry(updateRequest);
+    assertTrue(updateResponse.getStatus());
+
+    refreshRoutersCaches();
+  }
+
+  /**
+   * Refresh the caches of all Routers (to get the mount table).
+   */
+  private void refreshRoutersCaches() {
+    for (final Router router : routers) {
+      StateStoreService stateStore = router.getStateStore();
+      stateStore.refreshCaches(true);
+    }
+  }
+
+  /**
+   * Test the behavior of the Router when one of the subclusters in a mount
+   * point fails. In particular, it checks if it can write files or not.
+   * Related to {@link TestRouterRpcMultiDestination#testSubclusterDown()}.
+   */
+  @Test
+  public void testWriteWithFailedSubcluster() throws Exception {
+
+    // Run the actual tests with each approach
+    final List<Callable<Boolean>> tasks = new ArrayList<>();
+    final List<DestinationOrder> orders = asList(
+        DestinationOrder.HASH_ALL,
+        DestinationOrder.SPACE,
+        DestinationOrder.RANDOM,
+        DestinationOrder.HASH);
+    for (DestinationOrder order : orders) {
+      tasks.add(() -> {
+        testWriteWithFailedSubcluster(order);
+        return true;
+      });
+    }
+    TaskResults results = collectResults("Full tests", tasks);
+    assertEquals(orders.size(), results.getSuccess());
+  }
+
+  /**
+   * Test the behavior of the Router when one of the subclusters in a mount
+   * point fails. It assumes that ns1 is already down.
+   * @param order Destination order of the mount point.
+   * @throws Exception If we cannot run the test.
+   */
+  private void testWriteWithFailedSubcluster(final DestinationOrder order)
+      throws Exception {
+
+    final FileSystem router0Fs = getFileSystem(routers.get(0));
+    final FileSystem router1Fs = getFileSystem(routers.get(1));
+    final FileSystem ns0Fs = getFileSystem(namenodes.get("ns0").getRPCPort());
+
+    final String mountPoint = "/" + order + "-failsubcluster";
+    final Path mountPath = new Path(mountPoint);
+    LOG.info("Setup {} with order {}", mountPoint, order);
+    createMountTableEntry(mountPoint, order, namenodes.keySet());
+
+
+    LOG.info("Write in {} should succeed writing in ns0 and fail for ns1",
+        mountPath);
+    checkDirectoriesFaultTolerant(
+        mountPath, order, router0Fs, router1Fs, ns0Fs, false);
+    checkFilesFaultTolerant(
+        mountPath, order, router0Fs, router1Fs, ns0Fs, false);
+
+    LOG.info("Make {} fault tolerant and everything succeeds", mountPath);
+    IOException ioe = null;
+    try {
+      updateMountPointFaultTolerant(mountPoint);
+    } catch (IOException e) {
+      ioe = e;
+    }
+    if (DestinationOrder.FOLDER_ALL.contains(order)) {
+      assertNull(ioe);
+      checkDirectoriesFaultTolerant(
+          mountPath, order, router0Fs, router1Fs, ns0Fs, true);
+      checkFilesFaultTolerant(
+          mountPath, order, router0Fs, router1Fs, ns0Fs, true);
+    } else {
+      assertTrue(ioe.getMessage().startsWith(
+          "Invalid entry, fault tolerance only supported for ALL order"));
+    }
+  }
+
+  /**
+   * Check directory creation on a mount point.
+   * If it is fault tolerant, it should be able to write everything.
+   * If it is not fault tolerant, it should fail to write some.
+   */
+  private void checkDirectoriesFaultTolerant(
+      Path mountPoint, DestinationOrder order,
+      FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs,
+      boolean faultTolerant) throws Exception {
+
+    final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint);
+
+    LOG.info("Create directories in {}", mountPoint);
+    final List<Callable<Boolean>> tasks = new ArrayList<>();
+    for (int i = 0; i < NUM_FILES; i++) {
+      final Path dir = new Path(mountPoint,
+          String.format("dir-%s-%03d", faultTolerant, i));
+      FileSystem fs = getRandomRouterFileSystem();
+      tasks.add(getDirCreateTask(fs, dir));
+    }
+    TaskResults results = collectResults("Create dir " + mountPoint, tasks);
+
+    LOG.info("Check directories results for {}: {}", mountPoint, results);
+    if (faultTolerant || DestinationOrder.FOLDER_ALL.contains(order)) {
+      assertEquals(NUM_FILES, results.getSuccess());
+      assertEquals(0, results.getFailure());
+    } else {
+      assertBothResults("check dir " + mountPoint, NUM_FILES, results);
+    }
+
+    LOG.info("Check directories listing for {}", mountPoint);
+    tasks.add(getListFailTask(router0Fs, mountPoint));
+    int filesExpected = dirs0.length + results.getSuccess();
+    tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected));
+    assertEquals(2, collectResults("List "  + mountPoint, tasks).getSuccess());
+  }
+
+  /**
+   * Check file creation on a mount point.
+   * If it is fault tolerant, it should be able to write everything.
+   * If it is not fault tolerant, it should fail to write some of the files.
+   */
+  private void checkFilesFaultTolerant(
+      Path mountPoint, DestinationOrder order,
+      FileSystem router0Fs, FileSystem router1Fs, FileSystem ns0Fs,
+      boolean faultTolerant) throws Exception {
+
+    // Get one of the existing sub directories
+    final FileStatus[] dirs0 = listStatus(router1Fs, mountPoint);
+    final Path dir0 = Path.getPathWithoutSchemeAndAuthority(
+        dirs0[0].getPath());
+
+    LOG.info("Create files in {}",  dir0);
+    final List<Callable<Boolean>> tasks = new ArrayList<>();
+    for (int i = 0; i < NUM_FILES; i++) {
+      final String newFile = String.format("%s/file-%03d.txt", dir0, i);
+      FileSystem fs = getRandomRouterFileSystem();
+      tasks.add(getFileCreateTask(fs, newFile, ns0Fs));
+    }
+    TaskResults results = collectResults("Create file " + dir0, tasks);
+
+    LOG.info("Check files results for {}: {}", dir0, results);
+    if (faultTolerant || !DestinationOrder.FOLDER_ALL.contains(order)) {
+      assertEquals(NUM_FILES, results.getSuccess());
+      assertEquals(0, results.getFailure());
+    } else {
+      assertBothResults("check files " + dir0, NUM_FILES, results);
+    }
+
+    LOG.info("Check files listing for {}", dir0);
+    tasks.add(getListFailTask(router0Fs, dir0));
+    tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess()));
+    assertEquals(2, collectResults("List "  + dir0, tasks).getSuccess());
+  }
+
+  /**
+   * Get the string representation for the files.
+   * @param files Files to check.
+   * @return String representation.
+   */
+  private static String toString(final FileStatus[] files) {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    for (final FileStatus file : files) {
+      if (sb.length() > 1) {
+        sb.append(", ");
+      }
+      sb.append(Path.getPathWithoutSchemeAndAuthority(file.getPath()));
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+
+  /**
+   * List the files in a path.
+   * @param fs File system to check.
+   * @param path Path to list.
+   * @return List of files.
+   * @throws IOException If we cannot list.
+   */
+  private FileStatus[] listStatus(final FileSystem fs, final Path path)
+      throws IOException {
+    FileStatus[] files = new FileStatus[] {};
+    try {
+      files = fs.listStatus(path);
+    } catch (FileNotFoundException fnfe) {
+      LOG.debug("File not found: {}", fnfe.getMessage());
+    }
+    return files;
+  }
+
+  /**
+   * Task that creates a file and checks if it is available.
+   * @param file File to create.
+   * @param checkFs File system for checking if the file is properly created.
+   * @return Result of creating the file.
+   */
+  private static Callable<Boolean> getFileCreateTask(
+      final FileSystem fs, final String file, FileSystem checkFs) {
+    return () -> {
+      try {
+        Path path = new Path(file);
+        FSDataOutputStream os = fs.create(path);
+        // We don't write because we have no mock Datanodes
+        os.close();
+        FileStatus fileStatus = checkFs.getFileStatus(path);
+        assertTrue("File not created properly: " + fileStatus,
+            fileStatus.getLen() > 0);
+        return true;
+      } catch (RemoteException re) {
+        return false;
+      }
+    };
+  }
+
+  /**
+   * Task that creates a directory.
+   * @param dir Directory to create.
+   * @return Result of creating the directory..
+   */
+  private static Callable<Boolean> getDirCreateTask(
+      final FileSystem fs, final Path dir) {
+    return () -> {
+      try {
+        fs.mkdirs(dir);
+        return true;
+      } catch (RemoteException re) {
+        return false;
+      }
+    };
+  }
+
+  /**
+   * Task that lists a directory and expects to fail.
+   * @param fs File system to check.
+   * @param path Path to try to list.
+   * @return If the listing failed as expected.
+   */
+  private static Callable<Boolean> getListFailTask(FileSystem fs, Path path) {
+    return () -> {
+      try {
+        fs.listStatus(path);
+        return false;
+      } catch (RemoteException re) {
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Task that lists a directory and succeeds.
+   * @param fs File system to check.
+   * @param path Path to list.
+   * @param expected Number of files to expect to find.
+   * @return If the listing succeeds.
+   */
+  private static Callable<Boolean> getListSuccessTask(
+      FileSystem fs, Path path, int expected) {
+    return () -> {
+      final FileStatus[] dirs = fs.listStatus(path);
+      assertEquals(toString(dirs), expected, dirs.length);
+      return true;
+    };
+  }
+
+  /**
+   * Invoke a set of tasks and collect their outputs.
+   * The tasks should do assertions.
+   *
+   * @param service Execution Service to run the tasks.
+   * @param tasks Tasks to run.
+   * @throws Exception If it cannot collect the results.
+   */
+  private TaskResults collectResults(final String tag,
+      final Collection<Callable<Boolean>> tasks) throws Exception {
+    final TaskResults results = new TaskResults();
+    service.invokeAll(tasks).forEach(task -> {
+      try {
+        boolean succeeded = task.get();
+        if (succeeded) {
+          LOG.info("Got success for {}", tag);
+          results.incrSuccess();
+        } else {
+          LOG.info("Got failure for {}", tag);
+          results.incrFailure();
+        }
+      } catch (Exception e) {
+        fail(e.getMessage());
+      }
+    });
+    tasks.clear();
+    return results;
+  }
+
+  /**
+   * Class to summarize the results of running a task.
+   */
+  static class TaskResults {
+    private final AtomicInteger success = new AtomicInteger(0);
+    private final AtomicInteger failure = new AtomicInteger(0);
+    public void incrSuccess() {
+      success.incrementAndGet();
+    }
+    public void incrFailure() {
+      failure.incrementAndGet();
+    }
+    public int getSuccess() {
+      return success.get();
+    }
+    public int getFailure() {
+      return failure.get();
+    }
+    public int getTotal() {
+      return success.get() + failure.get();
+    }
+    @Override
+    public String toString() {
+      return new StringBuilder()
+          .append("Success=").append(getSuccess())
+          .append(" Failure=").append(getFailure())
+          .toString();
+    }
+  }
+
+  /**
+   * Asserts that the results are the expected amount and it has both success
+   * and failure.
+   * @param msg Message to show when the assertion fails.
+   * @param expected Expected number of results.
+   * @param actual Actual results.
+   */
+  private static void assertBothResults(String msg,
+      int expected, TaskResults actual) {
+    assertEquals(msg, expected, actual.getTotal());
+    assertTrue("Expected some success for " + msg, actual.getSuccess() > 0);
+    assertTrue("Expected some failure for " + msg, actual.getFailure() > 0);
+  }
+
+  /**
+   * Get a random Router from the cluster.
+   * @return Random Router.
+   */
+  private Router getRandomRouter() {
+    Random rnd = new Random();
+    int index = rnd.nextInt(routers.size());
+    return routers.get(index);
+  }
+
+  /**
+   * Get a file system from one of the Routers as a random user to allow better
+   * concurrency in the Router.
+   * @return File system from a random user.
+   * @throws Exception If we cannot create the file system.
+   */
+  private FileSystem getRandomRouterFileSystem() throws Exception {
+    final UserGroupInformation userUgi =
+        UserGroupInformation.createUserForTesting(
+            "user-" + UUID.randomUUID(), new String[]{"group"});
+    Router router = getRandomRouter();
+    return userUgi.doAs(
+        (PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
+  }
+
+  private static FileSystem getFileSystem(int rpcPort) throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    URI uri = URI.create("hdfs://localhost:" + rpcPort);
+    return DistributedFileSystem.get(uri, conf);
+  }
+
+  private static FileSystem getFileSystem(final Router router)
+      throws IOException {
+    InetSocketAddress rpcAddress = router.getRpcServerAddress();
+    int rpcPort = rpcAddress.getPort();
+    return getFileSystem(rpcPort);
+  }
+
+  private static RouterClient getAdminClient(
+      final Router router) throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    InetSocketAddress routerSocket = router.getAdminServerAddress();
+    return new RouterClient(routerSocket, conf);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
index 1224fa2..8fa3506 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java
@@ -74,7 +74,7 @@ public class TestRouterNamenodeMonitoring {
     for (String nsId : nsIds) {
       nns.put(nsId, new HashMap<>());
       for (String nnId : asList("nn0", "nn1")) {
-        nns.get(nsId).put(nnId, new MockNamenode());
+        nns.get(nsId).put(nnId, new MockNamenode(nsId));
       }
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
index 2ec5d62..98f9ebc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/FederationStateStoreTestUtils.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipState;
 import org.apache.hadoop.hdfs.server.federation.store.records.MembershipStats;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 
 /**
@@ -93,7 +94,7 @@ public final class FederationStateStoreTestUtils {
 
     conf.setClass(FEDERATION_STORE_DRIVER_CLASS, clazz, StateStoreDriver.class);
 
-    if (clazz.isAssignableFrom(StateStoreFileBaseImpl.class)) {
+    if (StateStoreFileBaseImpl.class.isAssignableFrom(clazz)) {
       setFileConfiguration(conf);
     }
     return conf;
@@ -178,8 +179,7 @@ public final class FederationStateStoreTestUtils {
    * @param conf Configuration to extend.
    */
   public static void setFileConfiguration(Configuration conf) {
-    String workingPath = System.getProperty("user.dir");
-    String stateStorePath = workingPath + "/statestore";
+    String stateStorePath = GenericTestUtils.getRandomizedTempPath();
     conf.set(FEDERATION_STORE_FILE_DIRECTORY, stateStorePath);
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java
index d30d6ba..6e5bd9c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/TestStateStoreMountTable.java
@@ -78,15 +78,17 @@ public class TestStateStoreMountTable extends TestStateStoreBase {
     assertFalse(getStateStore().isDriverReady());
 
     // Test APIs that access the store to check they throw the correct exception
+    MountTable entry = MountTable.newInstance(
+        "/mnt", Collections.singletonMap("ns0", "/tmp"));
     AddMountTableEntryRequest addRequest =
-        AddMountTableEntryRequest.newInstance();
+        AddMountTableEntryRequest.newInstance(entry);
     verifyException(mountStore, "addMountTableEntry",
         StateStoreUnavailableException.class,
         new Class[] {AddMountTableEntryRequest.class},
         new Object[] {addRequest});
 
     UpdateMountTableEntryRequest updateRequest =
-        UpdateMountTableEntryRequest.newInstance();
+        UpdateMountTableEntryRequest.newInstance(entry);
     verifyException(mountStore, "updateMountTableEntry",
         StateStoreUnavailableException.class,
         new Class[] {UpdateMountTableEntryRequest.class},
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
index 05552738..339a977 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/store/records/TestMountTable.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.federation.store.records;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -165,6 +166,24 @@ public class TestMountTable {
   }
 
   @Test
+  public void testFaultTolerant() throws IOException {
+
+    Map<String, String> dest = new LinkedHashMap<>();
+    dest.put(DST_NS_0, DST_PATH_0);
+    dest.put(DST_NS_1, DST_PATH_1);
+    MountTable record0 = MountTable.newInstance(SRC, dest);
+    assertFalse(record0.isFaultTolerant());
+
+    MountTable record1 = MountTable.newInstance(SRC, dest);
+    assertFalse(record1.isFaultTolerant());
+    assertEquals(record0, record1);
+
+    record1.setFaultTolerant(true);
+    assertTrue(record1.isFaultTolerant());
+    assertNotEquals(record0, record1);
+  }
+
+  @Test
   public void testOrder() throws IOException {
     testOrder(DestinationOrder.HASH);
     testOrder(DestinationOrder.LOCAL);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 32e88a2..6336281 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -428,8 +428,8 @@ Runs the DFS router. See [Router](../hadoop-hdfs-rbf/HDFSRouterFederation.html#R
 Usage:
 
       hdfs dfsrouteradmin
-          [-add <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
-          [-update <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
+          [-add <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
+          [-update <source> <nameservice1, nameservice2, ...> <destination> [-readonly] [-faulttolerant] [-order HASH|LOCAL|RANDOM|HASH_ALL] -owner <owner> -group <group> -mode <mode>]
           [-rm <source>]
           [-ls <path>]
           [-getDestination <path>]


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org