You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/08/26 00:44:19 UTC

[07/50] [abbrv] hadoop git commit: Revert "HDFS-13790. RBF: Move ClientProtocol APIs to its own module. Contributed by Chao Sun."

Revert "HDFS-13790. RBF: Move ClientProtocol APIs to its own module. Contributed by Chao Sun."

This reverts commit fa121eb66bc42e9cb5586f8c2e268cfdc2ed187a.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb5b3dce
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb5b3dce
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb5b3dce

Branch: refs/heads/HDFS-12943
Commit: fb5b3dce6192265bce9b9d93ab663bdc5be8048e
Parents: 9dd5d5b
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Aug 17 08:01:44 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Aug 17 08:01:44 2018 -0700

----------------------------------------------------------------------
 .../federation/router/RouterRpcServer.java      | 1360 ++++++++++++++++--
 1 file changed, 1202 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb5b3dce/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index fe54993..29f32a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -33,12 +33,16 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
@@ -50,6 +54,7 @@ import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.XAttr;
@@ -59,6 +64,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.AddBlockFlag;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
@@ -87,6 +93,7 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
 import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
 import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -94,8 +101,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
-import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
+import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
@@ -160,6 +167,11 @@ public class RouterRpcServer extends AbstractService
   /** Configuration for the RPC server. */
   private Configuration conf;
 
+  /** Identifier for the super user. */
+  private final String superUser;
+  /** Identifier for the super group. */
+  private final String superGroup;
+
   /** Router using this RPC server. */
   private final Router router;
 
@@ -187,10 +199,11 @@ public class RouterRpcServer extends AbstractService
   // Modules implementing groups of RPC calls
   /** Router Quota calls. */
   private final Quota quotaCall;
+  /** Erasure coding calls. */
+  private final ErasureCoding erasureCoding;
   /** NamenodeProtocol calls. */
   private final RouterNamenodeProtocol nnProto;
-  /** ClientProtocol calls. */
-  private final RouterClientProtocol clientProto;
+
 
   /**
    * Construct a router RPC server.
@@ -210,6 +223,12 @@ public class RouterRpcServer extends AbstractService
     this.namenodeResolver = nnResolver;
     this.subclusterResolver = fileResolver;
 
+    // User and group for reporting
+    this.superUser = System.getProperty("user.name");
+    this.superGroup = this.conf.get(
+        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
+        DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+
     // RPC server settings
     int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY,
         DFS_ROUTER_HANDLER_COUNT_DEFAULT);
@@ -296,8 +315,8 @@ public class RouterRpcServer extends AbstractService
 
     // Initialize modules
     this.quotaCall = new Quota(this.router, this);
+    this.erasureCoding = new ErasureCoding(this);
     this.nnProto = new RouterNamenodeProtocol(this);
-    this.clientProto = new RouterClientProtocol(conf, this);
   }
 
   @Override
@@ -352,13 +371,6 @@ public class RouterRpcServer extends AbstractService
   }
 
   /**
-   * Get the active namenode resolver
-   */
-  public ActiveNamenodeResolver getNamenodeResolver() {
-    return namenodeResolver;
-  }
-
-  /**
    * Get the RPC monitor and metrics.
    *
    * @return RPC monitor and metrics.
@@ -399,7 +411,7 @@ public class RouterRpcServer extends AbstractService
    *                           client requests.
    * @throws UnsupportedOperationException If the operation is not supported.
    */
-  void checkOperation(OperationCategory op, boolean supported)
+  protected void checkOperation(OperationCategory op, boolean supported)
       throws StandbyException, UnsupportedOperationException {
     checkOperation(op);
 
@@ -421,7 +433,7 @@ public class RouterRpcServer extends AbstractService
    * @throws SafeModeException If the Router is in safe mode and cannot serve
    *                           client requests.
    */
-  void checkOperation(OperationCategory op)
+  protected void checkOperation(OperationCategory op)
       throws StandbyException {
     // Log the function we are currently calling.
     if (rpcMonitor != null) {
@@ -452,44 +464,58 @@ public class RouterRpcServer extends AbstractService
     }
   }
 
-  /**
-   * Get the name of the method that is calling this function.
-   *
-   * @return Name of the method calling this function.
-   */
-  static String getMethodName() {
-    final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
-    String methodName = stack[3].getMethodName();
-    return methodName;
-  }
-
   @Override // ClientProtocol
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
-    return clientProto.getDelegationToken(renewer);
+    checkOperation(OperationCategory.WRITE, false);
+    return null;
+  }
+
+  /**
+   * The the delegation token from each name service.
+   * @param renewer
+   * @return Name service -> Token.
+   * @throws IOException
+   */
+  public Map<FederationNamespaceInfo, Token<DelegationTokenIdentifier>>
+      getDelegationTokens(Text renewer) throws IOException {
+    checkOperation(OperationCategory.WRITE, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    return clientProto.renewDelegationToken(token);
+    checkOperation(OperationCategory.WRITE, false);
+    return 0;
   }
 
   @Override // ClientProtocol
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
       throws IOException {
-    clientProto.cancelDelegationToken(token);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public LocatedBlocks getBlockLocations(String src, final long offset,
       final long length) throws IOException {
-    return clientProto.getBlockLocations(src, offset, length);
+    checkOperation(OperationCategory.READ);
+
+    List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations",
+        new Class<?>[] {String.class, long.class, long.class},
+        new RemoteParam(), offset, length);
+    return (LocatedBlocks) rpcClient.invokeSequential(locations, remoteMethod,
+        LocatedBlocks.class, null);
   }
 
   @Override // ClientProtocol
   public FsServerDefaults getServerDefaults() throws IOException {
-    return clientProto.getServerDefaults();
+    checkOperation(OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getServerDefaults");
+    String ns = subclusterResolver.getDefaultNamespace();
+    return (FsServerDefaults) rpcClient.invokeSingle(ns, method);
   }
 
   @Override // ClientProtocol
@@ -498,8 +524,44 @@ public class RouterRpcServer extends AbstractService
       boolean createParent, short replication, long blockSize,
       CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
       throws IOException {
-    return clientProto.create(src, masked, clientName, flag, createParent,
+    checkOperation(OperationCategory.WRITE);
+
+    if (createParent && isPathAll(src)) {
+      int index = src.lastIndexOf(Path.SEPARATOR);
+      String parent = src.substring(0, index);
+      LOG.debug("Creating {} requires creating parent {}", src, parent);
+      FsPermission parentPermissions = getParentPermission(masked);
+      boolean success = mkdirs(parent, parentPermissions, createParent);
+      if (!success) {
+        // This shouldn't happen as mkdirs returns true or exception
+        LOG.error("Couldn't create parents for {}", src);
+      }
+    }
+
+    RemoteLocation createLocation = getCreateLocation(src);
+    RemoteMethod method = new RemoteMethod("create",
+        new Class<?>[] {String.class, FsPermission.class, String.class,
+                        EnumSetWritable.class, boolean.class, short.class,
+                        long.class, CryptoProtocolVersion[].class,
+                        String.class},
+        createLocation.getDest(), masked, clientName, flag, createParent,
         replication, blockSize, supportedVersions, ecPolicyName);
+    return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
+  }
+
+  /**
+   * Get the permissions for the parent of a child with given permissions.
+   * Add implicit u+wx permission for parent. This is based on
+   * @{FSDirMkdirOp#addImplicitUwx}.
+   * @param mask The permission mask of the child.
+   * @return The permission mask of the parent.
+   */
+  private static FsPermission getParentPermission(final FsPermission mask) {
+    FsPermission ret = new FsPermission(
+        mask.getUserAction().or(FsAction.WRITE_EXECUTE),
+        mask.getGroupAction(),
+        mask.getOtherAction());
+    return ret;
   }
 
   /**
@@ -510,7 +572,7 @@ public class RouterRpcServer extends AbstractService
    * @return The remote location for this file.
    * @throws IOException If the file has no creation location.
    */
-  RemoteLocation getCreateLocation(final String src)
+  protected RemoteLocation getCreateLocation(final String src)
       throws IOException {
 
     final List<RemoteLocation> locations = getLocationsForPath(src, true);
@@ -551,45 +613,100 @@ public class RouterRpcServer extends AbstractService
     return createLocation;
   }
 
+  // Medium
   @Override // ClientProtocol
   public LastBlockWithStatus append(String src, final String clientName,
       final EnumSetWritable<CreateFlag> flag) throws IOException {
-    return clientProto.append(src, clientName, flag);
+    checkOperation(OperationCategory.WRITE);
+
+    List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("append",
+        new Class<?>[] {String.class, String.class, EnumSetWritable.class},
+        new RemoteParam(), clientName, flag);
+    return rpcClient.invokeSequential(
+        locations, method, LastBlockWithStatus.class, null);
   }
 
+  // Low
   @Override // ClientProtocol
   public boolean recoverLease(String src, String clientName)
       throws IOException {
-    return clientProto.recoverLease(src, clientName);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("recoverLease",
+        new Class<?>[] {String.class, String.class}, new RemoteParam(),
+        clientName);
+    Object result = rpcClient.invokeSequential(
+        locations, method, Boolean.class, Boolean.TRUE);
+    return (boolean) result;
   }
 
   @Override // ClientProtocol
   public boolean setReplication(String src, short replication)
       throws IOException {
-    return clientProto.setReplication(src, replication);
+    checkOperation(OperationCategory.WRITE);
+
+    List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("setReplication",
+        new Class<?>[] {String.class, short.class}, new RemoteParam(),
+        replication);
+    Object result = rpcClient.invokeSequential(
+        locations, method, Boolean.class, Boolean.TRUE);
+    return (boolean) result;
   }
 
-  @Override // ClientProtocol
+  @Override
   public void setStoragePolicy(String src, String policyName)
       throws IOException {
-    clientProto.setStoragePolicy(src, policyName);
+    checkOperation(OperationCategory.WRITE);
+
+    List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("setStoragePolicy",
+        new Class<?>[] {String.class, String.class},
+        new RemoteParam(), policyName);
+    rpcClient.invokeSequential(locations, method, null, null);
   }
 
-  @Override // ClientProtocol
+  @Override
   public BlockStoragePolicy[] getStoragePolicies() throws IOException {
-    return clientProto.getStoragePolicies();
+    checkOperation(OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getStoragePolicies");
+    String ns = subclusterResolver.getDefaultNamespace();
+    return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
   }
 
   @Override // ClientProtocol
   public void setPermission(String src, FsPermission permissions)
       throws IOException {
-    clientProto.setPermission(src, permissions);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("setPermission",
+        new Class<?>[] {String.class, FsPermission.class},
+        new RemoteParam(), permissions);
+    if (isPathAll(src)) {
+      rpcClient.invokeConcurrent(locations, method);
+    } else {
+      rpcClient.invokeSequential(locations, method);
+    }
   }
 
   @Override // ClientProtocol
   public void setOwner(String src, String username, String groupname)
       throws IOException {
-    clientProto.setOwner(src, username, groupname);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("setOwner",
+        new Class<?>[] {String.class, String.class, String.class},
+        new RemoteParam(), username, groupname);
+    if (isPathAll(src)) {
+      rpcClient.invokeConcurrent(locations, method);
+    } else {
+      rpcClient.invokeSequential(locations, method);
+    }
   }
 
   /**
@@ -601,8 +718,18 @@ public class RouterRpcServer extends AbstractService
       ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
       String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
       throws IOException {
-    return clientProto.addBlock(src, clientName, previous, excludedNodes,
-        fileId, favoredNodes, addBlockFlags);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("addBlock",
+        new Class<?>[] {String.class, String.class, ExtendedBlock.class,
+                        DatanodeInfo[].class, long.class, String[].class,
+                        EnumSet.class},
+        new RemoteParam(), clientName, previous, excludedNodes, fileId,
+        favoredNodes, addBlockFlags);
+    // TODO verify the excludedNodes and favoredNodes are acceptable to this NN
+    return (LocatedBlock) rpcClient.invokeSequential(
+        locations, method, LocatedBlock.class, null);
   }
 
   /**
@@ -615,26 +742,55 @@ public class RouterRpcServer extends AbstractService
       final String[] existingStorageIDs, final DatanodeInfo[] excludes,
       final int numAdditionalNodes, final String clientName)
           throws IOException {
-    return clientProto.getAdditionalDatanode(src, fileId, blk, existings,
-        existingStorageIDs, excludes, numAdditionalNodes, clientName);
+    checkOperation(OperationCategory.READ);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("getAdditionalDatanode",
+        new Class<?>[] {String.class, long.class, ExtendedBlock.class,
+                        DatanodeInfo[].class, String[].class,
+                        DatanodeInfo[].class, int.class, String.class},
+        new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes,
+        numAdditionalNodes, clientName);
+    return (LocatedBlock) rpcClient.invokeSequential(
+        locations, method, LocatedBlock.class, null);
   }
 
   @Override // ClientProtocol
   public void abandonBlock(ExtendedBlock b, long fileId, String src,
       String holder) throws IOException {
-    clientProto.abandonBlock(b, fileId, src, holder);
+    checkOperation(OperationCategory.WRITE);
+
+    RemoteMethod method = new RemoteMethod("abandonBlock",
+        new Class<?>[] {ExtendedBlock.class, long.class, String.class,
+                        String.class},
+        b, fileId, new RemoteParam(), holder);
+    rpcClient.invokeSingle(b, method);
   }
 
   @Override // ClientProtocol
   public boolean complete(String src, String clientName, ExtendedBlock last,
       long fileId) throws IOException {
-    return clientProto.complete(src, clientName, last, fileId);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("complete",
+        new Class<?>[] {String.class, String.class, ExtendedBlock.class,
+                        long.class},
+        new RemoteParam(), clientName, last, fileId);
+    // Complete can return true/false, so don't expect a result
+    return ((Boolean) rpcClient.invokeSequential(
+        locations, method, Boolean.class, null)).booleanValue();
   }
 
   @Override // ClientProtocol
   public LocatedBlock updateBlockForPipeline(
       ExtendedBlock block, String clientName) throws IOException {
-    return clientProto.updateBlockForPipeline(block, clientName);
+    checkOperation(OperationCategory.WRITE);
+
+    RemoteMethod method = new RemoteMethod("updateBlockForPipeline",
+        new Class<?>[] {ExtendedBlock.class, String.class},
+        block, clientName);
+    return (LocatedBlock) rpcClient.invokeSingle(block, method);
   }
 
   /**
@@ -645,91 +801,462 @@ public class RouterRpcServer extends AbstractService
   public void updatePipeline(String clientName, ExtendedBlock oldBlock,
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
           throws IOException {
-    clientProto.updatePipeline(clientName, oldBlock, newBlock, newNodes,
-        newStorageIDs);
+    checkOperation(OperationCategory.WRITE);
+
+    RemoteMethod method = new RemoteMethod("updatePipeline",
+        new Class<?>[] {String.class, ExtendedBlock.class, ExtendedBlock.class,
+                        DatanodeID[].class, String[].class},
+        clientName, oldBlock, newBlock, newNodes, newStorageIDs);
+    rpcClient.invokeSingle(oldBlock, method);
   }
 
   @Override // ClientProtocol
   public long getPreferredBlockSize(String src) throws IOException {
-    return clientProto.getPreferredBlockSize(src);
+    checkOperation(OperationCategory.READ);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("getPreferredBlockSize",
+        new Class<?>[] {String.class}, new RemoteParam());
+    return ((Long) rpcClient.invokeSequential(
+        locations, method, Long.class, null)).longValue();
+  }
+
+  /**
+   * Determines combinations of eligible src/dst locations for a rename. A
+   * rename cannot change the namespace. Renames are only allowed if there is an
+   * eligible dst location in the same namespace as the source.
+   *
+   * @param srcLocations List of all potential source destinations where the
+   *          path may be located. On return this list is trimmed to include
+   *          only the paths that have corresponding destinations in the same
+   *          namespace.
+   * @param dst The destination path
+   * @return A map of all eligible source namespaces and their corresponding
+   *         replacement value.
+   * @throws IOException If the dst paths could not be determined.
+   */
+  private RemoteParam getRenameDestinations(
+      final List<RemoteLocation> srcLocations, final String dst)
+          throws IOException {
+
+    final List<RemoteLocation> dstLocations = getLocationsForPath(dst, true);
+    final Map<RemoteLocation, String> dstMap = new HashMap<>();
+
+    Iterator<RemoteLocation> iterator = srcLocations.iterator();
+    while (iterator.hasNext()) {
+      RemoteLocation srcLocation = iterator.next();
+      RemoteLocation eligibleDst =
+          getFirstMatchingLocation(srcLocation, dstLocations);
+      if (eligibleDst != null) {
+        // Use this dst for this source location
+        dstMap.put(srcLocation, eligibleDst.getDest());
+      } else {
+        // This src destination is not valid, remove from the source list
+        iterator.remove();
+      }
+    }
+    return new RemoteParam(dstMap);
+  }
+
+  /**
+   * Get first matching location.
+   *
+   * @param location Location we are looking for.
+   * @param locations List of locations.
+   * @return The first matchin location in the list.
+   */
+  private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
+      List<RemoteLocation> locations) {
+    for (RemoteLocation loc : locations) {
+      if (loc.getNameserviceId().equals(location.getNameserviceId())) {
+        // Return first matching location
+        return loc;
+      }
+    }
+    return null;
   }
 
   @Deprecated
   @Override // ClientProtocol
   public boolean rename(final String src, final String dst)
       throws IOException {
-    return clientProto.rename(src, dst);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> srcLocations =
+        getLocationsForPath(src, true, false);
+    // srcLocations may be trimmed by getRenameDestinations()
+    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+    RemoteParam dstParam = getRenameDestinations(locs, dst);
+    if (locs.isEmpty()) {
+      throw new IOException(
+          "Rename of " + src + " to " + dst + " is not allowed," +
+          " no eligible destination in the same namespace was found.");
+    }
+    RemoteMethod method = new RemoteMethod("rename",
+        new Class<?>[] {String.class, String.class},
+        new RemoteParam(), dstParam);
+    return ((Boolean) rpcClient.invokeSequential(
+        locs, method, Boolean.class, Boolean.TRUE)).booleanValue();
   }
 
   @Override // ClientProtocol
   public void rename2(final String src, final String dst,
       final Options.Rename... options) throws IOException {
-    clientProto.rename2(src, dst, options);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> srcLocations =
+        getLocationsForPath(src, true, false);
+    // srcLocations may be trimmed by getRenameDestinations()
+    final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+    RemoteParam dstParam = getRenameDestinations(locs, dst);
+    if (locs.isEmpty()) {
+      throw new IOException(
+          "Rename of " + src + " to " + dst + " is not allowed," +
+          " no eligible destination in the same namespace was found.");
+    }
+    RemoteMethod method = new RemoteMethod("rename2",
+        new Class<?>[] {String.class, String.class, options.getClass()},
+        new RemoteParam(), dstParam, options);
+    rpcClient.invokeSequential(locs, method, null, null);
   }
 
   @Override // ClientProtocol
   public void concat(String trg, String[] src) throws IOException {
-    clientProto.concat(trg, src);
+    checkOperation(OperationCategory.WRITE);
+
+    // See if the src and target files are all in the same namespace
+    LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1);
+    if (targetBlocks == null) {
+      throw new IOException("Cannot locate blocks for target file - " + trg);
+    }
+    LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock();
+    String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId();
+    for (String source : src) {
+      LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1);
+      if (sourceBlocks == null) {
+        throw new IOException(
+            "Cannot located blocks for source file " + source);
+      }
+      String sourceBlockPoolId =
+          sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId();
+      if (!sourceBlockPoolId.equals(targetBlockPoolId)) {
+        throw new IOException("Cannot concatenate source file " + source
+            + " because it is located in a different namespace"
+            + " with block pool id " + sourceBlockPoolId
+            + " from the target file with block pool id "
+            + targetBlockPoolId);
+      }
+    }
+
+    // Find locations in the matching namespace.
+    final RemoteLocation targetDestination =
+        getLocationForPath(trg, true, targetBlockPoolId);
+    String[] sourceDestinations = new String[src.length];
+    for (int i = 0; i < src.length; i++) {
+      String sourceFile = src[i];
+      RemoteLocation location =
+          getLocationForPath(sourceFile, true, targetBlockPoolId);
+      sourceDestinations[i] = location.getDest();
+    }
+    // Invoke
+    RemoteMethod method = new RemoteMethod("concat",
+        new Class<?>[] {String.class, String[].class},
+        targetDestination.getDest(), sourceDestinations);
+    rpcClient.invokeSingle(targetDestination, method);
   }
 
   @Override // ClientProtocol
   public boolean truncate(String src, long newLength, String clientName)
       throws IOException {
-    return clientProto.truncate(src, newLength, clientName);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("truncate",
+        new Class<?>[] {String.class, long.class, String.class},
+        new RemoteParam(), newLength, clientName);
+    return ((Boolean) rpcClient.invokeSequential(locations, method,
+        Boolean.class, Boolean.TRUE)).booleanValue();
   }
 
   @Override // ClientProtocol
   public boolean delete(String src, boolean recursive) throws IOException {
-    return clientProto.delete(src, recursive);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations =
+        getLocationsForPath(src, true, false);
+    RemoteMethod method = new RemoteMethod("delete",
+        new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
+        recursive);
+    if (isPathAll(src)) {
+      return rpcClient.invokeAll(locations, method);
+    } else {
+      return rpcClient.invokeSequential(locations, method,
+          Boolean.class, Boolean.TRUE).booleanValue();
+    }
   }
 
   @Override // ClientProtocol
   public boolean mkdirs(String src, FsPermission masked, boolean createParent)
       throws IOException {
-    return clientProto.mkdirs(src, masked, createParent);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("mkdirs",
+        new Class<?>[] {String.class, FsPermission.class, boolean.class},
+        new RemoteParam(), masked, createParent);
+
+    // Create in all locations
+    if (isPathAll(src)) {
+      return rpcClient.invokeAll(locations, method);
+    }
+
+    if (locations.size() > 1) {
+      // Check if this directory already exists
+      try {
+        HdfsFileStatus fileStatus = getFileInfo(src);
+        if (fileStatus != null) {
+          // When existing, the NN doesn't return an exception; return true
+          return true;
+        }
+      } catch (IOException ioe) {
+        // Can't query if this file exists or not.
+        LOG.error("Error requesting file info for path {} while proxing mkdirs",
+            src, ioe);
+      }
+    }
+
+    RemoteLocation firstLocation = locations.get(0);
+    return ((Boolean) rpcClient.invokeSingle(firstLocation, method))
+        .booleanValue();
   }
 
   @Override // ClientProtocol
   public void renewLease(String clientName) throws IOException {
-    clientProto.renewLease(clientName);
+    checkOperation(OperationCategory.WRITE);
+
+    RemoteMethod method = new RemoteMethod("renewLease",
+        new Class<?>[] {String.class}, clientName);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, false, false);
   }
 
   @Override // ClientProtocol
   public DirectoryListing getListing(String src, byte[] startAfter,
       boolean needLocation) throws IOException {
-    return clientProto.getListing(src, startAfter, needLocation);
+    checkOperation(OperationCategory.READ);
+
+    // Locate the dir and fetch the listing
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("getListing",
+        new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
+        new RemoteParam(), startAfter, needLocation);
+    Map<RemoteLocation, DirectoryListing> listings =
+        rpcClient.invokeConcurrent(
+            locations, method, false, false, DirectoryListing.class);
+
+    Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
+    int totalRemainingEntries = 0;
+    int remainingEntries = 0;
+    boolean namenodeListingExists = false;
+    if (listings != null) {
+      // Check the subcluster listing with the smallest name
+      String lastName = null;
+      for (Entry<RemoteLocation, DirectoryListing> entry :
+          listings.entrySet()) {
+        RemoteLocation location = entry.getKey();
+        DirectoryListing listing = entry.getValue();
+        if (listing == null) {
+          LOG.debug("Cannot get listing from {}", location);
+        } else {
+          totalRemainingEntries += listing.getRemainingEntries();
+          HdfsFileStatus[] partialListing = listing.getPartialListing();
+          int length = partialListing.length;
+          if (length > 0) {
+            HdfsFileStatus lastLocalEntry = partialListing[length-1];
+            String lastLocalName = lastLocalEntry.getLocalName();
+            if (lastName == null || lastName.compareTo(lastLocalName) > 0) {
+              lastName = lastLocalName;
+            }
+          }
+        }
+      }
+
+      // Add existing entries
+      for (Object value : listings.values()) {
+        DirectoryListing listing = (DirectoryListing) value;
+        if (listing != null) {
+          namenodeListingExists = true;
+          for (HdfsFileStatus file : listing.getPartialListing()) {
+            String filename = file.getLocalName();
+            if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) {
+              // Discarding entries further than the lastName
+              remainingEntries++;
+            } else {
+              nnListing.put(filename, file);
+            }
+          }
+          remainingEntries += listing.getRemainingEntries();
+        }
+      }
+    }
+
+    // Add mount points at this level in the tree
+    final List<String> children = subclusterResolver.getMountPoints(src);
+    if (children != null) {
+      // Get the dates for each mount point
+      Map<String, Long> dates = getMountPointDates(src);
+
+      // Create virtual folder with the mount name
+      for (String child : children) {
+        long date = 0;
+        if (dates != null && dates.containsKey(child)) {
+          date = dates.get(child);
+        }
+        // TODO add number of children
+        HdfsFileStatus dirStatus = getMountPointStatus(child, 0, date);
+
+        // This may overwrite existing listing entries with the mount point
+        // TODO don't add if already there?
+        nnListing.put(child, dirStatus);
+      }
+    }
+
+    if (!namenodeListingExists && nnListing.size() == 0) {
+      // NN returns a null object if the directory cannot be found and has no
+      // listing. If we didn't retrieve any NN listing data, and there are no
+      // mount points here, return null.
+      return null;
+    }
+
+    // Generate combined listing
+    HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()];
+    combinedData = nnListing.values().toArray(combinedData);
+    return new DirectoryListing(combinedData, remainingEntries);
   }
 
   @Override // ClientProtocol
   public HdfsFileStatus getFileInfo(String src) throws IOException {
-    return clientProto.getFileInfo(src);
+    checkOperation(OperationCategory.READ);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("getFileInfo",
+        new Class<?>[] {String.class}, new RemoteParam());
+
+    HdfsFileStatus ret = null;
+    // If it's a directory, we check in all locations
+    if (isPathAll(src)) {
+      ret = getFileInfoAll(locations, method);
+    } else {
+      // Check for file information sequentially
+      ret = (HdfsFileStatus) rpcClient.invokeSequential(
+          locations, method, HdfsFileStatus.class, null);
+    }
+
+    // If there is no real path, check mount points
+    if (ret == null) {
+      List<String> children = subclusterResolver.getMountPoints(src);
+      if (children != null && !children.isEmpty()) {
+        Map<String, Long> dates = getMountPointDates(src);
+        long date = 0;
+        if (dates != null && dates.containsKey(src)) {
+          date = dates.get(src);
+        }
+        ret = getMountPointStatus(src, children.size(), date);
+      }
+    }
+
+    return ret;
+  }
+
+  /**
+   * Get the file info from all the locations.
+   *
+   * @param locations Locations to check.
+   * @param method The file information method to run.
+   * @return The first file info if it's a file, the directory if it's
+   *         everywhere.
+   * @throws IOException If all the locations throw an exception.
+   */
+  private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+      final RemoteMethod method) throws IOException {
+
+    // Get the file info from everybody
+    Map<RemoteLocation, HdfsFileStatus> results =
+        rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
+
+    // We return the first file
+    HdfsFileStatus dirStatus = null;
+    for (RemoteLocation loc : locations) {
+      HdfsFileStatus fileStatus = results.get(loc);
+      if (fileStatus != null) {
+        if (!fileStatus.isDirectory()) {
+          return fileStatus;
+        } else if (dirStatus == null) {
+          dirStatus = fileStatus;
+        }
+      }
+    }
+    return dirStatus;
   }
 
   @Override // ClientProtocol
   public boolean isFileClosed(String src) throws IOException {
-    return clientProto.isFileClosed(src);
+    checkOperation(OperationCategory.READ);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("isFileClosed",
+        new Class<?>[] {String.class}, new RemoteParam());
+    return ((Boolean) rpcClient.invokeSequential(
+        locations, method, Boolean.class, Boolean.TRUE)).booleanValue();
   }
 
   @Override // ClientProtocol
   public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
-    return clientProto.getFileLinkInfo(src);
+    checkOperation(OperationCategory.READ);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("getFileLinkInfo",
+        new Class<?>[] {String.class}, new RemoteParam());
+    return (HdfsFileStatus) rpcClient.invokeSequential(
+        locations, method, HdfsFileStatus.class, null);
   }
 
-  @Override // ClientProtocol
+  @Override
   public HdfsLocatedFileStatus getLocatedFileInfo(String src,
       boolean needBlockToken) throws IOException {
-    return clientProto.getLocatedFileInfo(src, needBlockToken);
+    checkOperation(OperationCategory.READ);
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("getLocatedFileInfo",
+        new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
+        Boolean.valueOf(needBlockToken));
+    return (HdfsLocatedFileStatus) rpcClient.invokeSequential(
+        locations, method, HdfsFileStatus.class, null);
   }
 
   @Override // ClientProtocol
   public long[] getStats() throws IOException {
-    return clientProto.getStats();
+    checkOperation(OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("getStats");
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, long[]> results =
+        rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
+    long[] combinedData = new long[STATS_ARRAY_LENGTH];
+    for (long[] data : results.values()) {
+      for (int i = 0; i < combinedData.length && i < data.length; i++) {
+        if (data[i] >= 0) {
+          combinedData[i] += data[i];
+        }
+      }
+    }
+    return combinedData;
   }
 
   @Override // ClientProtocol
   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
       throws IOException {
-    return clientProto.getDatanodeReport(type);
+    checkOperation(OperationCategory.UNCHECKED);
+    return getDatanodeReport(type, true, 0);
   }
 
   /**
@@ -778,7 +1305,29 @@ public class RouterRpcServer extends AbstractService
   @Override // ClientProtocol
   public DatanodeStorageReport[] getDatanodeStorageReport(
       DatanodeReportType type) throws IOException {
-    return clientProto.getDatanodeStorageReport(type);
+    checkOperation(OperationCategory.UNCHECKED);
+
+    Map<String, DatanodeStorageReport[]> dnSubcluster =
+        getDatanodeStorageReportMap(type);
+
+    // Avoid repeating machines in multiple subclusters
+    Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
+    for (DatanodeStorageReport[] dns : dnSubcluster.values()) {
+      for (DatanodeStorageReport dn : dns) {
+        DatanodeInfo dnInfo = dn.getDatanodeInfo();
+        String nodeId = dnInfo.getXferAddr();
+        if (!datanodesMap.containsKey(nodeId)) {
+          datanodesMap.put(nodeId, dn);
+        }
+        // TODO merge somehow, right now it just takes the first one
+      }
+    }
+
+    Collection<DatanodeStorageReport> datanodes = datanodesMap.values();
+    DatanodeStorageReport[] combinedData =
+        new DatanodeStorageReport[datanodes.size()];
+    combinedData = datanodes.toArray(combinedData);
+    return combinedData;
   }
 
   /**
@@ -811,388 +1360,740 @@ public class RouterRpcServer extends AbstractService
   @Override // ClientProtocol
   public boolean setSafeMode(SafeModeAction action, boolean isChecked)
       throws IOException {
-    return clientProto.setSafeMode(action, isChecked);
+    checkOperation(OperationCategory.WRITE);
+
+    // Set safe mode in all the name spaces
+    RemoteMethod method = new RemoteMethod("setSafeMode",
+        new Class<?>[] {SafeModeAction.class, boolean.class},
+        action, isChecked);
+    Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, Boolean> results =
+        rpcClient.invokeConcurrent(
+            nss, method, true, !isChecked, Boolean.class);
+
+    // We only report true if all the name space are in safe mode
+    int numSafemode = 0;
+    for (boolean safemode : results.values()) {
+      if (safemode) {
+        numSafemode++;
+      }
+    }
+    return numSafemode == results.size();
   }
 
   @Override // ClientProtocol
   public boolean restoreFailedStorage(String arg) throws IOException {
-    return clientProto.restoreFailedStorage(arg);
+    checkOperation(OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("restoreFailedStorage",
+        new Class<?>[] {String.class}, arg);
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, Boolean> ret =
+        rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
+
+    boolean success = true;
+    for (boolean s : ret.values()) {
+      if (!s) {
+        success = false;
+        break;
+      }
+    }
+    return success;
   }
 
   @Override // ClientProtocol
   public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
-    return clientProto.saveNamespace(timeWindow, txGap);
+    checkOperation(OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("saveNamespace",
+        new Class<?>[] {Long.class, Long.class}, timeWindow, txGap);
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, Boolean> ret =
+        rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
+
+    boolean success = true;
+    for (boolean s : ret.values()) {
+      if (!s) {
+        success = false;
+        break;
+      }
+    }
+    return success;
   }
 
   @Override // ClientProtocol
   public long rollEdits() throws IOException {
-    return clientProto.rollEdits();
+    checkOperation(OperationCategory.WRITE);
+
+    RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, Long> ret =
+        rpcClient.invokeConcurrent(nss, method, true, false, long.class);
+
+    // Return the maximum txid
+    long txid = 0;
+    for (long t : ret.values()) {
+      if (t > txid) {
+        txid = t;
+      }
+    }
+    return txid;
   }
 
   @Override // ClientProtocol
   public void refreshNodes() throws IOException {
-    clientProto.refreshNodes();
+    checkOperation(OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("refreshNodes", new Class<?>[] {});
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, true);
   }
 
   @Override // ClientProtocol
   public void finalizeUpgrade() throws IOException {
-    clientProto.finalizeUpgrade();
+    checkOperation(OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("finalizeUpgrade",
+        new Class<?>[] {});
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false);
   }
 
   @Override // ClientProtocol
   public boolean upgradeStatus() throws IOException {
-    return clientProto.upgradeStatus();
+    String methodName = getMethodName();
+    throw new UnsupportedOperationException(
+        "Operation \"" + methodName + "\" is not supported");
   }
 
   @Override // ClientProtocol
   public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
       throws IOException {
-    return clientProto.rollingUpgrade(action);
+    checkOperation(OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("rollingUpgrade",
+        new Class<?>[] {RollingUpgradeAction.class}, action);
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
+        rpcClient.invokeConcurrent(
+            nss, method, true, false, RollingUpgradeInfo.class);
+
+    // Return the first rolling upgrade info
+    RollingUpgradeInfo info = null;
+    for (RollingUpgradeInfo infoNs : ret.values()) {
+      if (info == null && infoNs != null) {
+        info = infoNs;
+      }
+    }
+    return info;
   }
 
   @Override // ClientProtocol
   public void metaSave(String filename) throws IOException {
-    clientProto.metaSave(filename);
+    checkOperation(OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("metaSave",
+        new Class<?>[] {String.class}, filename);
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false);
   }
 
   @Override // ClientProtocol
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
-    return clientProto.listCorruptFileBlocks(path, cookie);
+    checkOperation(OperationCategory.READ);
+
+    final List<RemoteLocation> locations = getLocationsForPath(path, false);
+    RemoteMethod method = new RemoteMethod("listCorruptFileBlocks",
+        new Class<?>[] {String.class, String.class},
+        new RemoteParam(), cookie);
+    return (CorruptFileBlocks) rpcClient.invokeSequential(
+        locations, method, CorruptFileBlocks.class, null);
   }
 
   @Override // ClientProtocol
   public void setBalancerBandwidth(long bandwidth) throws IOException {
-    clientProto.setBalancerBandwidth(bandwidth);
+    checkOperation(OperationCategory.UNCHECKED);
+
+    RemoteMethod method = new RemoteMethod("setBalancerBandwidth",
+        new Class<?>[] {Long.class}, bandwidth);
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    rpcClient.invokeConcurrent(nss, method, true, false);
   }
 
   @Override // ClientProtocol
   public ContentSummary getContentSummary(String path) throws IOException {
-    return clientProto.getContentSummary(path);
+    checkOperation(OperationCategory.READ);
+
+    // Get the summaries from regular files
+    Collection<ContentSummary> summaries = new LinkedList<>();
+    FileNotFoundException notFoundException = null;
+    try {
+      final List<RemoteLocation> locations = getLocationsForPath(path, false);
+      RemoteMethod method = new RemoteMethod("getContentSummary",
+          new Class<?>[] {String.class}, new RemoteParam());
+      Map<RemoteLocation, ContentSummary> results =
+          rpcClient.invokeConcurrent(
+              locations, method, false, false, ContentSummary.class);
+      summaries.addAll(results.values());
+    } catch (FileNotFoundException e) {
+      notFoundException = e;
+    }
+
+    // Add mount points at this level in the tree
+    final List<String> children = subclusterResolver.getMountPoints(path);
+    if (children != null) {
+      for (String child : children) {
+        Path childPath = new Path(path, child);
+        try {
+          ContentSummary mountSummary = getContentSummary(childPath.toString());
+          if (mountSummary != null) {
+            summaries.add(mountSummary);
+          }
+        } catch (Exception e) {
+          LOG.error("Cannot get content summary for mount {}: {}",
+              childPath, e.getMessage());
+        }
+      }
+    }
+
+    // Throw original exception if no original nor mount points
+    if (summaries.isEmpty() && notFoundException != null) {
+      throw notFoundException;
+    }
+
+    return aggregateContentSummary(summaries);
+  }
+
+  /**
+   * Aggregate content summaries for each subcluster.
+   *
+   * @param summaries Collection of individual summaries.
+   * @return Aggregated content summary.
+   */
+  private ContentSummary aggregateContentSummary(
+      Collection<ContentSummary> summaries) {
+    if (summaries.size() == 1) {
+      return summaries.iterator().next();
+    }
+
+    long length = 0;
+    long fileCount = 0;
+    long directoryCount = 0;
+    long quota = 0;
+    long spaceConsumed = 0;
+    long spaceQuota = 0;
+
+    for (ContentSummary summary : summaries) {
+      length += summary.getLength();
+      fileCount += summary.getFileCount();
+      directoryCount += summary.getDirectoryCount();
+      quota += summary.getQuota();
+      spaceConsumed += summary.getSpaceConsumed();
+      spaceQuota += summary.getSpaceQuota();
+    }
+
+    ContentSummary ret = new ContentSummary.Builder()
+        .length(length)
+        .fileCount(fileCount)
+        .directoryCount(directoryCount)
+        .quota(quota)
+        .spaceConsumed(spaceConsumed)
+        .spaceQuota(spaceQuota)
+        .build();
+    return ret;
   }
 
   @Override // ClientProtocol
   public void fsync(String src, long fileId, String clientName,
       long lastBlockLength) throws IOException {
-    clientProto.fsync(src, fileId, clientName, lastBlockLength);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("fsync",
+        new Class<?>[] {String.class, long.class, String.class, long.class },
+        new RemoteParam(), fileId, clientName, lastBlockLength);
+    rpcClient.invokeSequential(locations, method);
   }
 
   @Override // ClientProtocol
   public void setTimes(String src, long mtime, long atime) throws IOException {
-    clientProto.setTimes(src, mtime, atime);
+    checkOperation(OperationCategory.WRITE);
+
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("setTimes",
+        new Class<?>[] {String.class, long.class, long.class},
+        new RemoteParam(), mtime, atime);
+    rpcClient.invokeSequential(locations, method);
   }
 
   @Override // ClientProtocol
   public void createSymlink(String target, String link, FsPermission dirPerms,
       boolean createParent) throws IOException {
-    clientProto.createSymlink(target, link, dirPerms, createParent);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO Verify that the link location is in the same NS as the targets
+    final List<RemoteLocation> targetLocations =
+        getLocationsForPath(target, true);
+    final List<RemoteLocation> linkLocations =
+        getLocationsForPath(link, true);
+    RemoteLocation linkLocation = linkLocations.get(0);
+    RemoteMethod method = new RemoteMethod("createSymlink",
+        new Class<?>[] {String.class, String.class, FsPermission.class,
+                        boolean.class},
+        new RemoteParam(), linkLocation.getDest(), dirPerms, createParent);
+    rpcClient.invokeSequential(targetLocations, method);
   }
 
   @Override // ClientProtocol
   public String getLinkTarget(String path) throws IOException {
-    return clientProto.getLinkTarget(path);
+    checkOperation(OperationCategory.READ);
+
+    final List<RemoteLocation> locations = getLocationsForPath(path, true);
+    RemoteMethod method = new RemoteMethod("getLinkTarget",
+        new Class<?>[] {String.class}, new RemoteParam());
+    return (String) rpcClient.invokeSequential(
+        locations, method, String.class, null);
   }
 
   @Override // Client Protocol
   public void allowSnapshot(String snapshotRoot) throws IOException {
-    clientProto.allowSnapshot(snapshotRoot);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // Client Protocol
   public void disallowSnapshot(String snapshot) throws IOException {
-    clientProto.disallowSnapshot(snapshot);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public void renameSnapshot(String snapshotRoot, String snapshotOldName,
       String snapshotNewName) throws IOException {
-    clientProto.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // Client Protocol
   public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
       throws IOException {
-    return clientProto.getSnapshottableDirListing();
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
       String earlierSnapshotName, String laterSnapshotName) throws IOException {
-    return clientProto.getSnapshotDiffReport(
-        snapshotRoot, earlierSnapshotName, laterSnapshotName);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public SnapshotDiffReportListing getSnapshotDiffReportListing(
       String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
       byte[] startPath, int index) throws IOException {
-    return clientProto.getSnapshotDiffReportListing(snapshotRoot,
-        earlierSnapshotName, laterSnapshotName, startPath, index);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public long addCacheDirective(CacheDirectiveInfo path,
       EnumSet<CacheFlag> flags) throws IOException {
-    return clientProto.addCacheDirective(path, flags);
+    checkOperation(OperationCategory.WRITE, false);
+    return 0;
   }
 
   @Override // ClientProtocol
   public void modifyCacheDirective(CacheDirectiveInfo directive,
       EnumSet<CacheFlag> flags) throws IOException {
-    clientProto.modifyCacheDirective(directive, flags);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public void removeCacheDirective(long id) throws IOException {
-    clientProto.removeCacheDirective(id);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
       long prevId, CacheDirectiveInfo filter) throws IOException {
-    return clientProto.listCacheDirectives(prevId, filter);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public void addCachePool(CachePoolInfo info) throws IOException {
-    clientProto.addCachePool(info);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public void modifyCachePool(CachePoolInfo info) throws IOException {
-    clientProto.modifyCachePool(info);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public void removeCachePool(String cachePoolName) throws IOException {
-    clientProto.removeCachePool(cachePoolName);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
       throws IOException {
-    return clientProto.listCachePools(prevKey);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public void modifyAclEntries(String src, List<AclEntry> aclSpec)
       throws IOException {
-    clientProto.modifyAclEntries(src, aclSpec);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("modifyAclEntries",
+        new Class<?>[] {String.class, List.class},
+        new RemoteParam(), aclSpec);
+    rpcClient.invokeSequential(locations, method, null, null);
   }
 
   @Override // ClienProtocol
   public void removeAclEntries(String src, List<AclEntry> aclSpec)
       throws IOException {
-    clientProto.removeAclEntries(src, aclSpec);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("removeAclEntries",
+        new Class<?>[] {String.class, List.class},
+        new RemoteParam(), aclSpec);
+    rpcClient.invokeSequential(locations, method, null, null);
   }
 
   @Override // ClientProtocol
   public void removeDefaultAcl(String src) throws IOException {
-    clientProto.removeDefaultAcl(src);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("removeDefaultAcl",
+        new Class<?>[] {String.class}, new RemoteParam());
+    rpcClient.invokeSequential(locations, method);
   }
 
   @Override // ClientProtocol
   public void removeAcl(String src) throws IOException {
-    clientProto.removeAcl(src);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("removeAcl",
+        new Class<?>[] {String.class}, new RemoteParam());
+    rpcClient.invokeSequential(locations, method);
   }
 
   @Override // ClientProtocol
   public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
-    clientProto.setAcl(src, aclSpec);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod(
+        "setAcl", new Class<?>[] {String.class, List.class},
+        new RemoteParam(), aclSpec);
+    rpcClient.invokeSequential(locations, method);
   }
 
   @Override // ClientProtocol
   public AclStatus getAclStatus(String src) throws IOException {
-    return clientProto.getAclStatus(src);
+    checkOperation(OperationCategory.READ);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("getAclStatus",
+        new Class<?>[] {String.class}, new RemoteParam());
+    return (AclStatus) rpcClient.invokeSequential(
+        locations, method, AclStatus.class, null);
   }
 
   @Override // ClientProtocol
   public void createEncryptionZone(String src, String keyName)
       throws IOException {
-    clientProto.createEncryptionZone(src, keyName);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("createEncryptionZone",
+        new Class<?>[] {String.class, String.class},
+        new RemoteParam(), keyName);
+    rpcClient.invokeSequential(locations, method);
   }
 
   @Override // ClientProtocol
   public EncryptionZone getEZForPath(String src) throws IOException {
-    return clientProto.getEZForPath(src);
+    checkOperation(OperationCategory.READ);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("getEZForPath",
+        new Class<?>[] {String.class}, new RemoteParam());
+    return (EncryptionZone) rpcClient.invokeSequential(
+        locations, method, EncryptionZone.class, null);
   }
 
   @Override // ClientProtocol
   public BatchedEntries<EncryptionZone> listEncryptionZones(long prevId)
       throws IOException {
-    return clientProto.listEncryptionZones(prevId);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public void reencryptEncryptionZone(String zone, ReencryptAction action)
       throws IOException {
-    clientProto.reencryptEncryptionZone(zone, action);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
       long prevId) throws IOException {
-    return clientProto.listReencryptionStatus(prevId);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
-    clientProto.setXAttr(src, xAttr, flag);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("setXAttr",
+        new Class<?>[] {String.class, XAttr.class, EnumSet.class},
+        new RemoteParam(), xAttr, flag);
+    rpcClient.invokeSequential(locations, method);
   }
 
+  @SuppressWarnings("unchecked")
   @Override // ClientProtocol
   public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
       throws IOException {
-    return clientProto.getXAttrs(src, xAttrs);
+    checkOperation(OperationCategory.READ);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("getXAttrs",
+        new Class<?>[] {String.class, List.class}, new RemoteParam(), xAttrs);
+    return (List<XAttr>) rpcClient.invokeSequential(
+        locations, method, List.class, null);
   }
 
+  @SuppressWarnings("unchecked")
   @Override // ClientProtocol
   public List<XAttr> listXAttrs(String src) throws IOException {
-    return clientProto.listXAttrs(src);
+    checkOperation(OperationCategory.READ);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, false);
+    RemoteMethod method = new RemoteMethod("listXAttrs",
+        new Class<?>[] {String.class}, new RemoteParam());
+    return (List<XAttr>) rpcClient.invokeSequential(
+        locations, method, List.class, null);
   }
 
   @Override // ClientProtocol
   public void removeXAttr(String src, XAttr xAttr) throws IOException {
-    clientProto.removeXAttr(src, xAttr);
+    checkOperation(OperationCategory.WRITE);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("removeXAttr",
+        new Class<?>[] {String.class, XAttr.class}, new RemoteParam(), xAttr);
+    rpcClient.invokeSequential(locations, method);
   }
 
   @Override // ClientProtocol
   public void checkAccess(String path, FsAction mode) throws IOException {
-    clientProto.checkAccess(path, mode);
+    checkOperation(OperationCategory.READ);
+
+    // TODO handle virtual directories
+    final List<RemoteLocation> locations = getLocationsForPath(path, true);
+    RemoteMethod method = new RemoteMethod("checkAccess",
+        new Class<?>[] {String.class, FsAction.class},
+        new RemoteParam(), mode);
+    rpcClient.invokeSequential(locations, method);
   }
 
   @Override // ClientProtocol
   public long getCurrentEditLogTxid() throws IOException {
-    return clientProto.getCurrentEditLogTxid();
+    checkOperation(OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod(
+        "getCurrentEditLogTxid", new Class<?>[] {});
+    final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    Map<FederationNamespaceInfo, Long> ret =
+        rpcClient.invokeConcurrent(nss, method, true, false, long.class);
+
+    // Return the maximum txid
+    long txid = 0;
+    for (long t : ret.values()) {
+      if (t > txid) {
+        txid = t;
+      }
+    }
+    return txid;
   }
 
   @Override // ClientProtocol
   public EventBatchList getEditsFromTxid(long txid) throws IOException {
-    return clientProto.getEditsFromTxid(txid);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
-  @Override // ClientProtocol
+  @Override
   public DataEncryptionKey getDataEncryptionKey() throws IOException {
-    return clientProto.getDataEncryptionKey();
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
-  @Override // ClientProtocol
+  @Override
   public String createSnapshot(String snapshotRoot, String snapshotName)
       throws IOException {
-    return clientProto.createSnapshot(snapshotRoot, snapshotName);
+    checkOperation(OperationCategory.WRITE);
+    return null;
   }
 
-  @Override // ClientProtocol
+  @Override
   public void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws IOException {
-    clientProto.deleteSnapshot(snapshotRoot, snapshotName);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
   @Override // ClientProtocol
   public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
       StorageType type) throws IOException {
-    clientProto.setQuota(path, namespaceQuota, storagespaceQuota, type);
+    this.quotaCall.setQuota(path, namespaceQuota, storagespaceQuota, type);
   }
 
   @Override // ClientProtocol
   public QuotaUsage getQuotaUsage(String path) throws IOException {
-    return clientProto.getQuotaUsage(path);
+    return this.quotaCall.getQuotaUsage(path);
   }
 
-  @Override // ClientProtocol
+  @Override
   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
-    clientProto.reportBadBlocks(blocks);
+    checkOperation(OperationCategory.WRITE);
+
+    // Block pool id -> blocks
+    Map<String, List<LocatedBlock>> blockLocations = new HashMap<>();
+    for (LocatedBlock block : blocks) {
+      String bpId = block.getBlock().getBlockPoolId();
+      List<LocatedBlock> bpBlocks = blockLocations.get(bpId);
+      if (bpBlocks == null) {
+        bpBlocks = new LinkedList<>();
+        blockLocations.put(bpId, bpBlocks);
+      }
+      bpBlocks.add(block);
+    }
+
+    // Invoke each block pool
+    for (Entry<String, List<LocatedBlock>> entry : blockLocations.entrySet()) {
+      String bpId = entry.getKey();
+      List<LocatedBlock> bpBlocks = entry.getValue();
+
+      LocatedBlock[] bpBlocksArray =
+          bpBlocks.toArray(new LocatedBlock[bpBlocks.size()]);
+      RemoteMethod method = new RemoteMethod("reportBadBlocks",
+          new Class<?>[] {LocatedBlock[].class},
+          new Object[] {bpBlocksArray});
+      rpcClient.invokeSingleBlockPool(bpId, method);
+    }
   }
 
-  @Override // ClientProtocol
+  @Override
   public void unsetStoragePolicy(String src) throws IOException {
-    clientProto.unsetStoragePolicy(src);
+    checkOperation(OperationCategory.WRITE, false);
   }
 
-  @Override // ClientProtocol
+  @Override
   public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
-    return clientProto.getStoragePolicy(path);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // ClientProtocol
   public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
       throws IOException {
-    return clientProto.getErasureCodingPolicies();
+    return erasureCoding.getErasureCodingPolicies();
   }
 
   @Override // ClientProtocol
   public Map<String, String> getErasureCodingCodecs() throws IOException {
-    return clientProto.getErasureCodingCodecs();
+    return erasureCoding.getErasureCodingCodecs();
   }
 
   @Override // ClientProtocol
   public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
       ErasureCodingPolicy[] policies) throws IOException {
-    return clientProto.addErasureCodingPolicies(policies);
+    return erasureCoding.addErasureCodingPolicies(policies);
   }
 
   @Override // ClientProtocol
   public void removeErasureCodingPolicy(String ecPolicyName)
       throws IOException {
-    clientProto.removeErasureCodingPolicy(ecPolicyName);
+    erasureCoding.removeErasureCodingPolicy(ecPolicyName);
   }
 
   @Override // ClientProtocol
   public void disableErasureCodingPolicy(String ecPolicyName)
       throws IOException {
-    clientProto.disableErasureCodingPolicy(ecPolicyName);
+    erasureCoding.disableErasureCodingPolicy(ecPolicyName);
   }
 
   @Override // ClientProtocol
   public void enableErasureCodingPolicy(String ecPolicyName)
       throws IOException {
-    clientProto.enableErasureCodingPolicy(ecPolicyName);
+    erasureCoding.enableErasureCodingPolicy(ecPolicyName);
   }
 
   @Override // ClientProtocol
   public ErasureCodingPolicy getErasureCodingPolicy(String src)
       throws IOException {
-    return clientProto.getErasureCodingPolicy(src);
+    return erasureCoding.getErasureCodingPolicy(src);
   }
 
   @Override // ClientProtocol
   public void setErasureCodingPolicy(String src, String ecPolicyName)
       throws IOException {
-    clientProto.setErasureCodingPolicy(src, ecPolicyName);
+    erasureCoding.setErasureCodingPolicy(src, ecPolicyName);
   }
 
   @Override // ClientProtocol
   public void unsetErasureCodingPolicy(String src) throws IOException {
-    clientProto.unsetErasureCodingPolicy(src);
+    erasureCoding.unsetErasureCodingPolicy(src);
   }
 
-  @Override // ClientProtocol
+  @Override
   public ECBlockGroupStats getECBlockGroupStats() throws IOException {
-    return clientProto.getECBlockGroupStats();
+    return erasureCoding.getECBlockGroupStats();
   }
 
-  @Override // ClientProtocol
+  @Override
   public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
-    return clientProto.getReplicatedBlockStats();
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Deprecated
-  @Override // ClientProtocol
+  @Override
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
       throws IOException {
-    return clientProto.listOpenFiles(prevId);
+    return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
+        OpenFilesIterator.FILTER_PATH_DEFAULT);
   }
 
-  @Override // ClientProtocol
+  @Override
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
       EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
-    return clientProto.listOpenFiles(prevId, openFilesTypes, path);
-  }
-
-  @Override // ClientProtocol
-  public void satisfyStoragePolicy(String path) throws IOException {
-    clientProto.satisfyStoragePolicy(path);
+    checkOperation(OperationCategory.READ, false);
+    return null;
   }
 
   @Override // NamenodeProtocol
@@ -1266,11 +2167,6 @@ public class RouterRpcServer extends AbstractService
     return nnProto.isRollingUpgrade();
   }
 
-  @Override // NamenodeProtocol
-  public Long getNextSPSPath() throws IOException {
-    return nnProto.getNextSPSPath();
-  }
-
   /**
    * Locate the location with the matching block pool id.
    *
@@ -1280,7 +2176,7 @@ public class RouterRpcServer extends AbstractService
    * @return Prioritized list of locations in the federated cluster.
    * @throws IOException if the location for this path cannot be determined.
    */
-  protected RemoteLocation getLocationForPath(
+  private RemoteLocation getLocationForPath(
       String path, boolean failIfLocked, String blockPoolId)
           throws IOException {
 
@@ -1380,6 +2276,27 @@ public class RouterRpcServer extends AbstractService
   }
 
   /**
+   * Check if a path should be in all subclusters.
+   *
+   * @param path Path to check.
+   * @return If a path should be in all subclusters.
+   */
+  private boolean isPathAll(final String path) {
+    if (subclusterResolver instanceof MountTableResolver) {
+      try {
+        MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
+        MountTable entry = mountTable.getMountPoint(path);
+        if (entry != null) {
+          return entry.isAll();
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot get mount point", e);
+      }
+    }
+    return false;
+  }
+
+  /**
    * Check if a path is in a read only mount point.
    *
    * @param path Path to check.
@@ -1401,6 +2318,121 @@ public class RouterRpcServer extends AbstractService
   }
 
   /**
+   * Get the modification dates for mount points.
+   *
+   * @param path Name of the path to start checking dates from.
+   * @return Map with the modification dates for all sub-entries.
+   */
+  private Map<String, Long> getMountPointDates(String path) {
+    Map<String, Long> ret = new TreeMap<>();
+    if (subclusterResolver instanceof MountTableResolver) {
+      try {
+        final List<String> children = subclusterResolver.getMountPoints(path);
+        for (String child : children) {
+          Long modTime = getModifiedTime(ret, path, child);
+          ret.put(child, modTime);
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot get mount point", e);
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Get modified time for child. If the child is present in mount table it
+   * will return the modified time. If the child is not present but subdirs of
+   * this child are present then it will return latest modified subdir's time
+   * as modified time of the requested child.
+   * @param ret contains children and modified times.
+   * @param mountTable.
+   * @param path Name of the path to start checking dates from.
+   * @param child child of the requested path.
+   * @return modified time.
+   */
+  private long getModifiedTime(Map<String, Long> ret, String path,
+      String child) {
+    MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
+    String srcPath;
+    if (path.equals(Path.SEPARATOR)) {
+      srcPath = Path.SEPARATOR + child;
+    } else {
+      srcPath = path + Path.SEPARATOR + child;
+    }
+    Long modTime = 0L;
+    try {
+      // Get mount table entry for the srcPath
+      MountTable entry = mountTable.getMountPoint(srcPath);
+      // if srcPath is not in mount table but its subdirs are in mount
+      // table we will display latest modified subdir date/time.
+      if (entry == null) {
+        List<MountTable> entries = mountTable.getMounts(srcPath);
+        for (MountTable eachEntry : entries) {
+          // Get the latest date
+          if (ret.get(child) == null ||
+              ret.get(child) < eachEntry.getDateModified()) {
+            modTime = eachEntry.getDateModified();
+          }
+        }
+      } else {
+        modTime = entry.getDateModified();
+      }
+    } catch (IOException e) {
+      LOG.error("Cannot get mount point", e);
+    }
+    return modTime;
+  }
+
+  /**
+   * Create a new file status for a mount point.
+   *
+   * @param name Name of the mount point.
+   * @param childrenNum Number of children.
+   * @param date Map with the dates.
+   * @return New HDFS file status representing a mount point.
+   */
+  private HdfsFileStatus getMountPointStatus(
+      String name, int childrenNum, long date) {
+    long modTime = date;
+    long accessTime = date;
+    FsPermission permission = FsPermission.getDirDefault();
+    String owner = this.superUser;
+    String group = this.superGroup;
+    try {
+      // TODO support users, it should be the user for the pointed folder
+      UserGroupInformation ugi = getRemoteUser();
+      owner = ugi.getUserName();
+      group = ugi.getPrimaryGroupName();
+    } catch (IOException e) {
+      LOG.error("Cannot get the remote user: {}", e.getMessage());
+    }
+    long inodeId = 0;
+    return new HdfsFileStatus.Builder()
+      .isdir(true)
+      .mtime(modTime)
+      .atime(accessTime)
+      .perm(permission)
+      .owner(owner)
+      .group(group)
+      .symlink(new byte[0])
+      .path(DFSUtil.string2Bytes(name))
+      .fileId(inodeId)
+      .children(childrenNum)
+      .build();
+  }
+
+  /**
+   * Get the name of the method that is calling this function.
+   *
+   * @return Name of the method calling this function.
+   */
+  private static String getMethodName() {
+    final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+    String methodName = stack[3].getMethodName();
+    return methodName;
+  }
+
+  /**
    * Get the user that is invoking this operation.
    *
    * @return Remote user group information.
@@ -1458,4 +2490,16 @@ public class RouterRpcServer extends AbstractService
   public FederationRPCMetrics getRPCMetrics() {
     return this.rpcMonitor.getRPCMetrics();
   }
+
+  @Override
+  public void satisfyStoragePolicy(String path) throws IOException {
+    checkOperation(OperationCategory.WRITE, false);
+  }
+
+  @Override
+  public Long getNextSPSPath() throws IOException {
+    checkOperation(OperationCategory.READ, false);
+    // not supported
+    return null;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org