You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/08 20:58:04 UTC

[32/45] hadoop git commit: HDFS-11546. Federation Router RPC server. Contributed by Jason Kace and Inigo Goiri.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
index ee6f57d..2875750 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.util.Time;
 
 /**
  * In-memory cache/mock of a namenode and file resolver. Stores the most
- * recently updated NN information for each nameservice and block pool. Also
+ * recently updated NN information for each nameservice and block pool. It also
  * stores a virtual mount table for resolving global namespace paths to local NN
  * paths.
  */
@@ -51,82 +51,93 @@ public class MockResolver
     implements ActiveNamenodeResolver, FileSubclusterResolver {
 
   private Map<String, List<? extends FederationNamenodeContext>> resolver =
-      new HashMap<String, List<? extends FederationNamenodeContext>>();
-  private Map<String, List<RemoteLocation>> locations =
-      new HashMap<String, List<RemoteLocation>>();
-  private Set<FederationNamespaceInfo> namespaces =
-      new HashSet<FederationNamespaceInfo>();
+      new HashMap<>();
+  private Map<String, List<RemoteLocation>> locations = new HashMap<>();
+  private Set<FederationNamespaceInfo> namespaces = new HashSet<>();
   private String defaultNamespace = null;
 
+
   public MockResolver(Configuration conf, StateStoreService store) {
     this.cleanRegistrations();
   }
 
-  public void addLocation(String mount, String nameservice, String location) {
-    RemoteLocation remoteLocation = new RemoteLocation(nameservice, location);
-    List<RemoteLocation> locationsList = locations.get(mount);
+  public void addLocation(String mount, String nsId, String location) {
+    List<RemoteLocation> locationsList = this.locations.get(mount);
     if (locationsList == null) {
-      locationsList = new LinkedList<RemoteLocation>();
-      locations.put(mount, locationsList);
+      locationsList = new LinkedList<>();
+      this.locations.put(mount, locationsList);
     }
+
+    final RemoteLocation remoteLocation = new RemoteLocation(nsId, location);
     if (!locationsList.contains(remoteLocation)) {
       locationsList.add(remoteLocation);
     }
 
     if (this.defaultNamespace == null) {
-      this.defaultNamespace = nameservice;
+      this.defaultNamespace = nsId;
     }
   }
 
   public synchronized void cleanRegistrations() {
-    this.resolver =
-        new HashMap<String, List<? extends FederationNamenodeContext>>();
-    this.namespaces = new HashSet<FederationNamespaceInfo>();
+    this.resolver = new HashMap<>();
+    this.namespaces = new HashSet<>();
   }
 
   @Override
   public void updateActiveNamenode(
-      String ns, InetSocketAddress successfulAddress) {
+      String nsId, InetSocketAddress successfulAddress) {
 
     String address = successfulAddress.getHostName() + ":" +
         successfulAddress.getPort();
-    String key = ns;
+    String key = nsId;
     if (key != null) {
       // Update the active entry
       @SuppressWarnings("unchecked")
-      List<FederationNamenodeContext> iterator =
-          (List<FederationNamenodeContext>) resolver.get(key);
-      for (FederationNamenodeContext namenode : iterator) {
+      List<FederationNamenodeContext> namenodes =
+          (List<FederationNamenodeContext>) this.resolver.get(key);
+      for (FederationNamenodeContext namenode : namenodes) {
         if (namenode.getRpcAddress().equals(address)) {
           MockNamenodeContext nn = (MockNamenodeContext) namenode;
           nn.setState(FederationNamenodeServiceState.ACTIVE);
           break;
         }
       }
-      Collections.sort(iterator, new NamenodePriorityComparator());
+      // This operation modifies the list so we need to be careful
+      synchronized(namenodes) {
+        Collections.sort(namenodes, new NamenodePriorityComparator());
+      }
     }
   }
 
   @Override
   public List<? extends FederationNamenodeContext>
       getNamenodesForNameserviceId(String nameserviceId) {
-    return resolver.get(nameserviceId);
+    // Return a copy of the list because it is updated periodically
+    List<? extends FederationNamenodeContext> namenodes =
+        this.resolver.get(nameserviceId);
+    return Collections.unmodifiableList(new ArrayList<>(namenodes));
   }
 
   @Override
   public List<? extends FederationNamenodeContext> getNamenodesForBlockPoolId(
       String blockPoolId) {
-    return resolver.get(blockPoolId);
+    // Return a copy of the list because it is updated periodically
+    List<? extends FederationNamenodeContext> namenodes =
+        this.resolver.get(blockPoolId);
+    return Collections.unmodifiableList(new ArrayList<>(namenodes));
   }
 
   private static class MockNamenodeContext
       implements FederationNamenodeContext {
+
+    private String namenodeId;
+    private String nameserviceId;
+
     private String webAddress;
     private String rpcAddress;
     private String serviceAddress;
     private String lifelineAddress;
-    private String namenodeId;
-    private String nameserviceId;
+
     private FederationNamenodeServiceState state;
     private long dateModified;
 
@@ -197,6 +208,7 @@ public class MockResolver
   @Override
   public synchronized boolean registerNamenode(NamenodeStatusReport report)
       throws IOException {
+
     MockNamenodeContext context = new MockNamenodeContext(
         report.getRpcAddress(), report.getServiceAddress(),
         report.getLifelineAddress(), report.getWebAddress(),
@@ -205,13 +217,14 @@ public class MockResolver
     String nsId = report.getNameserviceId();
     String bpId = report.getBlockPoolId();
     String cId = report.getClusterId();
+
     @SuppressWarnings("unchecked")
     List<MockNamenodeContext> existingItems =
-        (List<MockNamenodeContext>) resolver.get(nsId);
+        (List<MockNamenodeContext>) this.resolver.get(nsId);
     if (existingItems == null) {
-      existingItems = new ArrayList<MockNamenodeContext>();
-      resolver.put(bpId, existingItems);
-      resolver.put(nsId, existingItems);
+      existingItems = new ArrayList<>();
+      this.resolver.put(bpId, existingItems);
+      this.resolver.put(nsId, existingItems);
     }
     boolean added = false;
     for (int i=0; i<existingItems.size() && !added; i++) {
@@ -227,7 +240,7 @@ public class MockResolver
     Collections.sort(existingItems, new NamenodePriorityComparator());
 
     FederationNamespaceInfo info = new FederationNamespaceInfo(bpId, cId, nsId);
-    namespaces.add(info);
+    this.namespaces.add(info);
     return true;
   }
 
@@ -238,16 +251,13 @@ public class MockResolver
 
   @Override
   public PathLocation getDestinationForPath(String path) throws IOException {
-    String finalPath = null;
-    String nameservice = null;
-    Set<String> namespaceSet = new HashSet<String>();
-    LinkedList<RemoteLocation> remoteLocations =
-        new LinkedList<RemoteLocation>();
-    for(String key : this.locations.keySet()) {
-      if(path.startsWith(key)) {
+    Set<String> namespaceSet = new HashSet<>();
+    List<RemoteLocation> remoteLocations = new LinkedList<>();
+    for (String key : this.locations.keySet()) {
+      if (path.startsWith(key)) {
         for (RemoteLocation location : this.locations.get(key)) {
-          finalPath = location.getDest() + path.substring(key.length());
-          nameservice = location.getNameserviceId();
+          String finalPath = location.getDest() + path.substring(key.length());
+          String nameservice = location.getNameserviceId();
           RemoteLocation remoteLocation =
               new RemoteLocation(nameservice, finalPath);
           remoteLocations.add(remoteLocation);
@@ -265,7 +275,7 @@ public class MockResolver
 
   @Override
   public List<String> getMountPoints(String path) throws IOException {
-    List<String> mounts = new ArrayList<String>();
+    List<String> mounts = new ArrayList<>();
     if (path.equals("/")) {
       // Mounts only supported under root level
       for (String mount : this.locations.keySet()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
index 16d624c..39fcf7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterConfigBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.federation;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 
 /**
  * Constructs a router configuration with individual features enabled/disabled.
@@ -26,15 +27,32 @@ public class RouterConfigBuilder {
 
   private Configuration conf;
 
+  private boolean enableRpcServer = false;
+
   public RouterConfigBuilder(Configuration configuration) {
     this.conf = configuration;
   }
 
   public RouterConfigBuilder() {
-    this.conf = new Configuration();
+    this.conf = new Configuration(false);
+  }
+
+  public RouterConfigBuilder all() {
+    this.enableRpcServer = true;
+    return this;
+  }
+
+  public RouterConfigBuilder rpc(boolean enable) {
+    this.enableRpcServer = enable;
+    return this;
+  }
+
+  public RouterConfigBuilder rpc() {
+    return this.rpc(true);
   }
 
   public Configuration build() {
+    conf.setBoolean(DFSConfigKeys.DFS_ROUTER_RPC_ENABLE, this.enableRpcServer);
     return conf;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
index 55d04ad..4031b7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/RouterDFSCluster.java
@@ -17,27 +17,44 @@
  */
 package org.apache.hadoop.hdfs.server.federation;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_DEFAULT_NAMESERVICE;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.waitNamenodeRegistered;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Random;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -46,16 +63,49 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology.NSConf;
 import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
 import org.apache.hadoop.hdfs.server.federation.router.Router;
+import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.Service.STATE;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Test utility to mimic a federated HDFS cluster with a router.
+ * Test utility to mimic a federated HDFS cluster with multiple routers.
  */
 public class RouterDFSCluster {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterDFSCluster.class);
+
+  public static final String TEST_STRING = "teststring";
+  public static final String TEST_DIR = "testdir";
+  public static final String TEST_FILE = "testfile";
+
+
+  /** Nameservices in the federated cluster. */
+  private List<String> nameservices;
+  /** Namenodes in the federated cluster. */
+  private List<NamenodeContext> namenodes;
+  /** Routers in the federated cluster. */
+  private List<RouterContext> routers;
+  /** If the Namenodes are in high availability.*/
+  private boolean highAvailability;
+
+  /** Mini cluster. */
+  private MiniDFSCluster cluster;
+
+  /** Router configuration overrides. */
+  private Configuration routerOverrides;
+  /** Namenode configuration overrides. */
+  private Configuration namenodeOverrides;
+
+
   /**
    * Router context.
    */
@@ -69,13 +119,14 @@ public class RouterDFSCluster {
     private Configuration conf;
     private URI fileSystemUri;
 
-    public RouterContext(Configuration conf, String ns, String nn)
+    public RouterContext(Configuration conf, String nsId, String nnId)
         throws URISyntaxException {
-      this.namenodeId = nn;
-      this.nameserviceId = ns;
       this.conf = conf;
-      router = new Router();
-      router.init(conf);
+      this.nameserviceId = nsId;
+      this.namenodeId = nnId;
+
+      this.router = new Router();
+      this.router.init(conf);
     }
 
     public Router getRouter() {
@@ -99,18 +150,30 @@ public class RouterDFSCluster {
     }
 
     public void initRouter() throws URISyntaxException {
+      // Store the bound points for the router interfaces
+      InetSocketAddress rpcAddress = router.getRpcServerAddress();
+      if (rpcAddress != null) {
+        this.rpcPort = rpcAddress.getPort();
+        this.fileSystemUri =
+            URI.create("hdfs://" + NetUtils.getHostPortString(rpcAddress));
+        // Override the default FS to point to the router RPC
+        DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
+        try {
+          this.fileContext = FileContext.getFileContext(conf);
+        } catch (UnsupportedFileSystemException e) {
+          this.fileContext = null;
+        }
+      }
     }
 
-    public DistributedFileSystem getFileSystem() throws IOException {
-      DistributedFileSystem fs =
-          (DistributedFileSystem) DistributedFileSystem.get(conf);
-      return fs;
+    public FileSystem getFileSystem() throws IOException {
+      return DistributedFileSystem.get(conf);
     }
 
     public DFSClient getClient(UserGroupInformation user)
         throws IOException, URISyntaxException, InterruptedException {
 
-      LOG.info("Connecting to router at " + fileSystemUri);
+      LOG.info("Connecting to router at {}", fileSystemUri);
       return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
         @Override
         public DFSClient run() throws IOException {
@@ -120,9 +183,8 @@ public class RouterDFSCluster {
     }
 
     public DFSClient getClient() throws IOException, URISyntaxException {
-
       if (client == null) {
-        LOG.info("Connecting to router at " + fileSystemUri);
+        LOG.info("Connecting to router at {}", fileSystemUri);
         client = new DFSClient(fileSystemUri, conf);
       }
       return client;
@@ -130,9 +192,10 @@ public class RouterDFSCluster {
   }
 
   /**
-   * Namenode context.
+   * Namenode context in the federated cluster.
    */
   public class NamenodeContext {
+    private Configuration conf;
     private NameNode namenode;
     private String nameserviceId;
     private String namenodeId;
@@ -143,14 +206,13 @@ public class RouterDFSCluster {
     private int httpPort;
     private URI fileSystemUri;
     private int index;
-    private Configuration conf;
     private DFSClient client;
 
-    public NamenodeContext(Configuration conf, String ns, String nn,
-        int index) {
+    public NamenodeContext(
+        Configuration conf, String nsId, String nnId, int index) {
       this.conf = conf;
-      this.namenodeId = nn;
-      this.nameserviceId = ns;
+      this.nameserviceId = nsId;
+      this.namenodeId = nnId;
       this.index = index;
     }
 
@@ -170,20 +232,19 @@ public class RouterDFSCluster {
       return this.fileContext;
     }
 
-    public void setNamenode(NameNode n) throws URISyntaxException {
-      namenode = n;
+    public void setNamenode(NameNode nn) throws URISyntaxException {
+      this.namenode = nn;
 
-      // Store the bound ports and override the default FS with the local NN's
-      // RPC
-      rpcPort = n.getNameNodeAddress().getPort();
-      servicePort = n.getServiceRpcAddress().getPort();
-      lifelinePort = n.getServiceRpcAddress().getPort();
-      httpPort = n.getHttpAddress().getPort();
-      fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
-      DistributedFileSystem.setDefaultUri(conf, fileSystemUri);
+      // Store the bound ports and override the default FS with the local NN RPC
+      this.rpcPort = nn.getNameNodeAddress().getPort();
+      this.servicePort = nn.getServiceRpcAddress().getPort();
+      this.lifelinePort = nn.getServiceRpcAddress().getPort();
+      this.httpPort = nn.getHttpAddress().getPort();
+      this.fileSystemUri = new URI("hdfs://" + namenode.getHostAndPort());
+      DistributedFileSystem.setDefaultUri(this.conf, this.fileSystemUri);
 
       try {
-        this.fileContext = FileContext.getFileContext(conf);
+        this.fileContext = FileContext.getFileContext(this.conf);
       } catch (UnsupportedFileSystemException e) {
         this.fileContext = null;
       }
@@ -205,10 +266,8 @@ public class RouterDFSCluster {
       return namenode.getHttpAddress().getHostName() + ":" + httpPort;
     }
 
-    public DistributedFileSystem getFileSystem() throws IOException {
-      DistributedFileSystem fs =
-          (DistributedFileSystem) DistributedFileSystem.get(conf);
-      return fs;
+    public FileSystem getFileSystem() throws IOException {
+      return DistributedFileSystem.get(conf);
     }
 
     public void resetClient() {
@@ -218,7 +277,7 @@ public class RouterDFSCluster {
     public DFSClient getClient(UserGroupInformation user)
         throws IOException, URISyntaxException, InterruptedException {
 
-      LOG.info("Connecting to namenode at " + fileSystemUri);
+      LOG.info("Connecting to namenode at {}", fileSystemUri);
       return user.doAs(new PrivilegedExceptionAction<DFSClient>() {
         @Override
         public DFSClient run() throws IOException {
@@ -229,7 +288,7 @@ public class RouterDFSCluster {
 
     public DFSClient getClient() throws IOException, URISyntaxException {
       if (client == null) {
-        LOG.info("Connecting to namenode at " + fileSystemUri);
+        LOG.info("Connecting to namenode at {}", fileSystemUri);
         client = new DFSClient(fileSystemUri, conf);
       }
       return client;
@@ -244,36 +303,20 @@ public class RouterDFSCluster {
     }
   }
 
-  public static final String NAMENODE1 = "nn0";
-  public static final String NAMENODE2 = "nn1";
-  public static final String NAMENODE3 = "nn2";
-  public static final String TEST_STRING = "teststring";
-  public static final String TEST_DIR = "testdir";
-  public static final String TEST_FILE = "testfile";
-
-  private List<String> nameservices;
-  private List<RouterContext> routers;
-  private List<NamenodeContext> namenodes;
-  private static final Log LOG = LogFactory.getLog(RouterDFSCluster.class);
-  private MiniDFSCluster cluster;
-  private boolean highAvailability;
-
-  protected static final int DEFAULT_HEARTBEAT_INTERVAL = 5;
-  protected static final int DEFAULT_CACHE_INTERVAL_SEC = 5;
-  private Configuration routerOverrides;
-  private Configuration namenodeOverrides;
-
-  private static final String NAMENODES = NAMENODE1 + "," + NAMENODE2;
-
-  public RouterDFSCluster(boolean ha, int numNameservices) {
-    this(ha, numNameservices, 2);
-  }
-
   public RouterDFSCluster(boolean ha, int numNameservices, int numNamenodes) {
     this.highAvailability = ha;
     configureNameservices(numNameservices, numNamenodes);
   }
 
+  public RouterDFSCluster(boolean ha, int numNameservices) {
+    this(ha, numNameservices, 2);
+  }
+
+  /**
+   * Add configuration settings to override default Router settings.
+   *
+   * @param conf Router configuration overrides.
+   */
   public void addRouterOverrides(Configuration conf) {
     if (this.routerOverrides == null) {
       this.routerOverrides = conf;
@@ -282,6 +325,11 @@ public class RouterDFSCluster {
     }
   }
 
+  /**
+   * Add configuration settings to override default Namenode settings.
+   *
+   * @param conf Namenode configuration overrides.
+   */
   public void addNamenodeOverrides(Configuration conf) {
     if (this.namenodeOverrides == null) {
       this.namenodeOverrides = conf;
@@ -290,124 +338,134 @@ public class RouterDFSCluster {
     }
   }
 
-  public Configuration generateNamenodeConfiguration(
-      String defaultNameserviceId) {
-    Configuration c = new HdfsConfiguration();
+  /**
+   * Generate the configuration for a client.
+   *
+   * @param nsId Nameservice identifier.
+   * @return New namenode configuration.
+   */
+  public Configuration generateNamenodeConfiguration(String nsId) {
+    Configuration conf = new HdfsConfiguration();
 
-    c.set(DFSConfigKeys.DFS_NAMESERVICES, getNameservicesKey());
-    c.set("fs.defaultFS", "hdfs://" + defaultNameserviceId);
+    conf.set(DFS_NAMESERVICES, getNameservicesKey());
+    conf.set(FS_DEFAULT_NAME_KEY, "hdfs://" + nsId);
 
     for (String ns : nameservices) {
       if (highAvailability) {
-        c.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns, NAMENODES);
+        conf.set(
+            DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
+            NAMENODES[0] + "," + NAMENODES[1]);
       }
 
       for (NamenodeContext context : getNamenodes(ns)) {
         String suffix = context.getConfSuffix();
 
-        c.set(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
+        conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY + "." + suffix,
             "127.0.0.1:" + context.rpcPort);
-        c.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
+        conf.set(DFS_NAMENODE_HTTP_ADDRESS_KEY + "." + suffix,
             "127.0.0.1:" + context.httpPort);
-        c.set(DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
+        conf.set(DFS_NAMENODE_RPC_BIND_HOST_KEY + "." + suffix,
             "0.0.0.0");
       }
     }
 
-    if (namenodeOverrides != null) {
-      c.addResource(namenodeOverrides);
+    if (this.namenodeOverrides != null) {
+      conf.addResource(this.namenodeOverrides);
     }
-    return c;
+    return conf;
   }
 
+  /**
+   * Generate the configuration for a client.
+   *
+   * @return New configuration for a client.
+   */
   public Configuration generateClientConfiguration() {
-    Configuration conf = new HdfsConfiguration();
-    conf.addResource(generateNamenodeConfiguration(getNameservices().get(0)));
+    Configuration conf = new HdfsConfiguration(false);
+    String ns0 = getNameservices().get(0);
+    conf.addResource(generateNamenodeConfiguration(ns0));
     return conf;
   }
 
-  public Configuration generateRouterConfiguration(String localNameserviceId,
-      String localNamenodeId) throws IOException {
-    Configuration conf = new HdfsConfiguration();
-    conf.addResource(generateNamenodeConfiguration(localNameserviceId));
+  /**
+   * Generate the configuration for a Router.
+   *
+   * @param nsId Nameservice identifier.
+   * @param nnId Namenode identifier.
+   * @return New configuration for a Router.
+   */
+  public Configuration generateRouterConfiguration(String nsId, String nnId) {
+
+    Configuration conf = new HdfsConfiguration(false);
+    conf.addResource(generateNamenodeConfiguration(nsId));
+
+    conf.setInt(DFS_ROUTER_HANDLER_COUNT_KEY, 10);
+    conf.set(DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+    conf.set(DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
+
+    conf.set(DFS_ROUTER_DEFAULT_NAMESERVICE, nameservices.get(0));
 
     // Use mock resolver classes
-    conf.set(DFSConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
-        MockResolver.class.getCanonicalName());
-    conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
-        MockResolver.class.getCanonicalName());
+    conf.setClass(FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, ActiveNamenodeResolver.class);
+    conf.setClass(FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
+        MockResolver.class, FileSubclusterResolver.class);
 
     // Set the nameservice ID for the default NN monitor
-    conf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, localNameserviceId);
-
-    if (localNamenodeId != null) {
-      conf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, localNamenodeId);
+    conf.set(DFS_NAMESERVICE_ID, nsId);
+    if (nnId != null) {
+      conf.set(DFS_HA_NAMENODE_ID_KEY, nnId);
     }
 
-    StringBuilder routerBuilder = new StringBuilder();
-    for (String ns : nameservices) {
-      for (NamenodeContext context : getNamenodes(ns)) {
-        String suffix = context.getConfSuffix();
-
-        if (routerBuilder.length() != 0) {
-          routerBuilder.append(",");
-        }
-        routerBuilder.append(suffix);
+    // Add custom overrides if available
+    if (this.routerOverrides != null) {
+      for (Entry<String, String> entry : this.routerOverrides) {
+        String confKey = entry.getKey();
+        String confValue = entry.getValue();
+        conf.set(confKey, confValue);
       }
     }
-
     return conf;
   }
 
   public void configureNameservices(int numNameservices, int numNamenodes) {
-    nameservices = new ArrayList<String>();
-    for (int i = 0; i < numNameservices; i++) {
-      nameservices.add("ns" + i);
-    }
-    namenodes = new ArrayList<NamenodeContext>();
-    int index = 0;
-    for (String ns : nameservices) {
-      Configuration nnConf = generateNamenodeConfiguration(ns);
-      if (highAvailability) {
-        NamenodeContext context =
-            new NamenodeContext(nnConf, ns, NAMENODE1, index);
-        namenodes.add(context);
-        index++;
-
-        if (numNamenodes > 1) {
-          context = new NamenodeContext(nnConf, ns, NAMENODE2, index + 1);
-          namenodes.add(context);
-          index++;
-        }
+    this.nameservices = new ArrayList<>();
+    this.namenodes = new ArrayList<>();
 
-        if (numNamenodes > 2) {
-          context = new NamenodeContext(nnConf, ns, NAMENODE3, index + 1);
-          namenodes.add(context);
-          index++;
-        }
+    NamenodeContext context = null;
+    int nnIndex = 0;
+    for (int i=0; i<numNameservices; i++) {
+      String ns = "ns" + i;
+      this.nameservices.add("ns" + i);
 
+      Configuration nnConf = generateNamenodeConfiguration(ns);
+      if (!highAvailability) {
+        context = new NamenodeContext(nnConf, ns, null, nnIndex++);
+        this.namenodes.add(context);
       } else {
-        NamenodeContext context = new NamenodeContext(nnConf, ns, null, index);
-        namenodes.add(context);
-        index++;
+        for (int j=0; j<numNamenodes; j++) {
+          context = new NamenodeContext(nnConf, ns, NAMENODES[j], nnIndex++);
+          this.namenodes.add(context);
+        }
       }
     }
   }
 
   public String getNameservicesKey() {
-    StringBuilder ns = new StringBuilder();
-    for (int i = 0; i < nameservices.size(); i++) {
-      if (i > 0) {
-        ns.append(",");
+    StringBuilder sb = new StringBuilder();
+    for (String nsId : this.nameservices) {
+      if (sb.length() > 0) {
+        sb.append(",");
       }
-      ns.append(nameservices.get(i));
+      sb.append(nsId);
     }
-    return ns.toString();
+    return sb.toString();
   }
 
   public String getRandomNameservice() {
     Random r = new Random();
-    return nameservices.get(r.nextInt(nameservices.size()));
+    int randIndex = r.nextInt(nameservices.size());
+    return nameservices.get(randIndex);
   }
 
   public List<String> getNameservices() {
@@ -415,7 +473,7 @@ public class RouterDFSCluster {
   }
 
   public List<NamenodeContext> getNamenodes(String nameservice) {
-    ArrayList<NamenodeContext> nns = new ArrayList<NamenodeContext>();
+    List<NamenodeContext> nns = new ArrayList<>();
     for (NamenodeContext c : namenodes) {
       if (c.nameserviceId.equals(nameservice)) {
         nns.add(c);
@@ -426,23 +484,23 @@ public class RouterDFSCluster {
 
   public NamenodeContext getRandomNamenode() {
     Random rand = new Random();
-    return namenodes.get(rand.nextInt(namenodes.size()));
+    int i = rand.nextInt(this.namenodes.size());
+    return this.namenodes.get(i);
   }
 
   public List<NamenodeContext> getNamenodes() {
-    return namenodes;
+    return this.namenodes;
   }
 
   public boolean isHighAvailability() {
     return highAvailability;
   }
 
-  public NamenodeContext getNamenode(String nameservice,
-      String namenode) {
-    for (NamenodeContext c : namenodes) {
+  public NamenodeContext getNamenode(String nameservice, String namenode) {
+    for (NamenodeContext c : this.namenodes) {
       if (c.nameserviceId.equals(nameservice)) {
-        if (namenode == null || c.namenodeId == null || namenode.isEmpty()
-            || c.namenodeId.isEmpty()) {
+        if (namenode == null || namenode.isEmpty() ||
+            c.namenodeId == null || c.namenodeId.isEmpty()) {
           return c;
         } else if (c.namenodeId.equals(namenode)) {
           return c;
@@ -453,7 +511,7 @@ public class RouterDFSCluster {
   }
 
   public List<RouterContext> getRouters(String nameservice) {
-    ArrayList<RouterContext> nns = new ArrayList<RouterContext>();
+    List<RouterContext> nns = new ArrayList<>();
     for (RouterContext c : routers) {
       if (c.nameserviceId.equals(nameservice)) {
         nns.add(c);
@@ -462,14 +520,13 @@ public class RouterDFSCluster {
     return nns;
   }
 
-  public RouterContext getRouterContext(String nameservice,
-      String namenode) {
+  public RouterContext getRouterContext(String nsId, String nnId) {
     for (RouterContext c : routers) {
-      if (namenode == null) {
+      if (nnId == null) {
         return c;
       }
-      if (c.namenodeId.equals(namenode)
-          && c.nameserviceId.equals(nameservice)) {
+      if (c.namenodeId.equals(nnId) &&
+          c.nameserviceId.equals(nsId)) {
         return c;
       }
     }
@@ -485,10 +542,10 @@ public class RouterDFSCluster {
     return routers;
   }
 
-  public RouterContext buildRouter(String nameservice, String namenode)
+  public RouterContext buildRouter(String nsId, String nnId)
       throws URISyntaxException, IOException {
-    Configuration config = generateRouterConfiguration(nameservice, namenode);
-    RouterContext rc = new RouterContext(config, nameservice, namenode);
+    Configuration config = generateRouterConfiguration(nsId, nnId);
+    RouterContext rc = new RouterContext(config, nsId, nnId);
     return rc;
   }
 
@@ -500,10 +557,9 @@ public class RouterDFSCluster {
     try {
       MiniDFSNNTopology topology = new MiniDFSNNTopology();
       for (String ns : nameservices) {
-
         NSConf conf = new MiniDFSNNTopology.NSConf(ns);
         if (highAvailability) {
-          for(int i = 0; i < namenodes.size()/nameservices.size(); i++) {
+          for (int i=0; i<namenodes.size()/nameservices.size(); i++) {
             NNConf nnConf = new MiniDFSNNTopology.NNConf("nn" + i);
             conf.addNN(nnConf);
           }
@@ -516,11 +572,15 @@ public class RouterDFSCluster {
       topology.setFederation(true);
 
       // Start mini DFS cluster
-      Configuration nnConf = generateNamenodeConfiguration(nameservices.get(0));
+      String ns0 = nameservices.get(0);
+      Configuration nnConf = generateNamenodeConfiguration(ns0);
       if (overrideConf != null) {
         nnConf.addResource(overrideConf);
       }
-      cluster = new MiniDFSCluster.Builder(nnConf).nnTopology(topology).build();
+      cluster = new MiniDFSCluster.Builder(nnConf)
+          .numDataNodes(nameservices.size()*2)
+          .nnTopology(topology)
+          .build();
       cluster.waitActive();
 
       // Store NN pointers
@@ -530,28 +590,32 @@ public class RouterDFSCluster {
       }
 
     } catch (Exception e) {
-      LOG.error("Cannot start Router DFS cluster: " + e.getMessage(), e);
-      cluster.shutdown();
+      LOG.error("Cannot start Router DFS cluster: {}", e.getMessage(), e);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
   public void startRouters()
       throws InterruptedException, URISyntaxException, IOException {
-    // Create routers
-    routers = new ArrayList<RouterContext>();
-    for (String ns : nameservices) {
 
+    // Create one router per nameservice
+    this.routers = new ArrayList<>();
+    for (String ns : this.nameservices) {
       for (NamenodeContext context : getNamenodes(ns)) {
-        routers.add(buildRouter(ns, context.namenodeId));
+        RouterContext router = buildRouter(ns, context.namenodeId);
+        this.routers.add(router);
       }
     }
 
     // Start all routers
-    for (RouterContext router : routers) {
+    for (RouterContext router : this.routers) {
       router.router.start();
     }
+
     // Wait until all routers are active and record their ports
-    for (RouterContext router : routers) {
+    for (RouterContext router : this.routers) {
       waitActive(router);
       router.initRouter();
     }
@@ -570,22 +634,21 @@ public class RouterDFSCluster {
       }
       Thread.sleep(1000);
     }
-    assertFalse(
-        "Timeout waiting for " + router.router.toString() + " to activate.",
-        true);
+    fail("Timeout waiting for " + router.router + " to activate");
   }
 
-
   public void registerNamenodes() throws IOException {
-    for (RouterContext r : routers) {
+    for (RouterContext r : this.routers) {
       ActiveNamenodeResolver resolver = r.router.getNamenodeResolver();
-      for (NamenodeContext nn : namenodes) {
+      for (NamenodeContext nn : this.namenodes) {
         // Generate a report
-        NamenodeStatusReport report = new NamenodeStatusReport(nn.nameserviceId,
-            nn.namenodeId, nn.getRpcAddress(), nn.getServiceAddress(),
+        NamenodeStatusReport report = new NamenodeStatusReport(
+            nn.nameserviceId, nn.namenodeId,
+            nn.getRpcAddress(), nn.getServiceAddress(),
             nn.getLifelineAddress(), nn.getHttpAddress());
-        report.setNamespaceInfo(nn.namenode.getNamesystem().getFSImage()
-            .getStorage().getNamespaceInfo());
+        FSImage fsImage = nn.namenode.getNamesystem().getFSImage();
+        NamespaceInfo nsInfo = fsImage.getStorage().getNamespaceInfo();
+        report.setNamespaceInfo(nsInfo);
 
         // Determine HA state from nn public state string
         String nnState = nn.namenode.getState();
@@ -606,74 +669,97 @@ public class RouterDFSCluster {
 
   public void waitNamenodeRegistration()
       throws InterruptedException, IllegalStateException, IOException {
-    for (RouterContext r : routers) {
-      for (NamenodeContext nn : namenodes) {
-        FederationTestUtils.waitNamenodeRegistered(
-            r.router.getNamenodeResolver(), nn.nameserviceId, nn.namenodeId,
-            null);
+    for (RouterContext r : this.routers) {
+      Router router = r.router;
+      for (NamenodeContext nn : this.namenodes) {
+        ActiveNamenodeResolver nnResolver = router.getNamenodeResolver();
+        waitNamenodeRegistered(
+            nnResolver, nn.nameserviceId, nn.namenodeId, null);
       }
     }
   }
 
   public void waitRouterRegistrationQuorum(RouterContext router,
-      FederationNamenodeServiceState state, String nameservice, String namenode)
+      FederationNamenodeServiceState state, String nsId, String nnId)
           throws InterruptedException, IOException {
-    LOG.info("Waiting for NN - " + nameservice + ":" + namenode
-        + " to transition to state - " + state);
-    FederationTestUtils.waitNamenodeRegistered(
-        router.router.getNamenodeResolver(), nameservice, namenode, state);
+    LOG.info("Waiting for NN {} {} to transition to {}", nsId, nnId, state);
+    ActiveNamenodeResolver nnResolver = router.router.getNamenodeResolver();
+    waitNamenodeRegistered(nnResolver, nsId, nnId, state);
   }
 
-  public String getFederatedPathForNameservice(String ns) {
-    return "/" + ns;
+  /**
+   * Get the federated path for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return Path in the Router.
+   */
+  public String getFederatedPathForNS(String nsId) {
+    return "/" + nsId;
   }
 
-  public String getNamenodePathForNameservice(String ns) {
-    return "/target-" + ns;
+  /**
+   * Get the namenode path for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return Path in the Namenode.
+   */
+  public String getNamenodePathForNS(String nsId) {
+    return "/target-" + nsId;
   }
 
   /**
-   * @return example:
+   * Get the federated test directory for a nameservice.
+   * @param nsId Nameservice identifier.
+   * @return Example:
    *         <ul>
    *         <li>/ns0/testdir which maps to ns0->/target-ns0/testdir
    *         </ul>
    */
-  public String getFederatedTestDirectoryForNameservice(String ns) {
-    return getFederatedPathForNameservice(ns) + "/" + TEST_DIR;
+  public String getFederatedTestDirectoryForNS(String nsId) {
+    return getFederatedPathForNS(nsId) + "/" + TEST_DIR;
   }
 
   /**
+   * Get the namenode test directory for a nameservice.
+   * @param nsId Nameservice identifier.
    * @return example:
    *         <ul>
    *         <li>/target-ns0/testdir
    *         </ul>
    */
-  public String getNamenodeTestDirectoryForNameservice(String ns) {
-    return getNamenodePathForNameservice(ns) + "/" + TEST_DIR;
+  public String getNamenodeTestDirectoryForNS(String nsId) {
+    return getNamenodePathForNS(nsId) + "/" + TEST_DIR;
   }
 
   /**
+   * Get the federated test file for a nameservice.
+   * @param nsId Nameservice identifier.
    * @return example:
    *         <ul>
    *         <li>/ns0/testfile which maps to ns0->/target-ns0/testfile
    *         </ul>
    */
-  public String getFederatedTestFileForNameservice(String ns) {
-    return getFederatedPathForNameservice(ns) + "/" + TEST_FILE;
+  public String getFederatedTestFileForNS(String nsId) {
+    return getFederatedPathForNS(nsId) + "/" + TEST_FILE;
   }
 
   /**
+   * Get the namenode test file for a nameservice.
+   * @param nsId Nameservice identifier.
    * @return example:
    *         <ul>
    *         <li>/target-ns0/testfile
    *         </ul>
    */
-  public String getNamenodeTestFileForNameservice(String ns) {
-    return getNamenodePathForNameservice(ns) + "/" + TEST_FILE;
+  public String getNamenodeTestFileForNS(String nsId) {
+    return getNamenodePathForNS(nsId) + "/" + TEST_FILE;
   }
 
+  /**
+   * Stop the federated HDFS cluster.
+   */
   public void shutdown() {
-    cluster.shutdown();
+    if (cluster != null) {
+      cluster.shutdown();
+    }
     if (routers != null) {
       for (RouterContext context : routers) {
         stopRouter(context);
@@ -681,9 +767,12 @@ public class RouterDFSCluster {
     }
   }
 
+  /**
+   * Stop a router.
+   * @param router Router context.
+   */
   public void stopRouter(RouterContext router) {
     try {
-
       router.router.shutDown();
 
       int loopCount = 0;
@@ -691,7 +780,7 @@ public class RouterDFSCluster {
         loopCount++;
         Thread.sleep(1000);
         if (loopCount > 20) {
-          LOG.error("Unable to shutdown router - " + router.rpcPort);
+          LOG.error("Cannot shutdown router {}", router.rpcPort);
           break;
         }
       }
@@ -714,26 +803,28 @@ public class RouterDFSCluster {
     for (String ns : getNameservices()) {
       NamenodeContext context = getNamenode(ns, null);
       if (!createTestDirectoriesNamenode(context)) {
-        throw new IOException("Unable to create test directory for ns - " + ns);
+        throw new IOException("Cannot create test directory for ns " + ns);
       }
     }
   }
 
   public boolean createTestDirectoriesNamenode(NamenodeContext nn)
       throws IOException {
-    return FederationTestUtils.addDirectory(nn.getFileSystem(),
-        getNamenodeTestDirectoryForNameservice(nn.nameserviceId));
+    FileSystem fs = nn.getFileSystem();
+    String testDir = getNamenodeTestDirectoryForNS(nn.nameserviceId);
+    return addDirectory(fs, testDir);
   }
 
   public void deleteAllFiles() throws IOException {
     // Delete all files via the NNs and verify
     for (NamenodeContext context : getNamenodes()) {
-      FileStatus[] status = context.getFileSystem().listStatus(new Path("/"));
-      for(int i = 0; i <status.length; i++) {
+      FileSystem fs = context.getFileSystem();
+      FileStatus[] status = fs.listStatus(new Path("/"));
+      for (int i = 0; i <status.length; i++) {
         Path p = status[i].getPath();
-        context.getFileSystem().delete(p, true);
+        fs.delete(p, true);
       }
-      status = context.getFileSystem().listStatus(new Path("/"));
+      status = fs.listStatus(new Path("/"));
       assertEquals(status.length, 0);
     }
   }
@@ -754,14 +845,34 @@ public class RouterDFSCluster {
       MockResolver resolver =
           (MockResolver) r.router.getSubclusterResolver();
       // create table entries
-      for (String ns : nameservices) {
+      for (String nsId : nameservices) {
         // Direct path
-        resolver.addLocation(getFederatedPathForNameservice(ns), ns,
-            getNamenodePathForNameservice(ns));
+        String routerPath = getFederatedPathForNS(nsId);
+        String nnPath = getNamenodePathForNS(nsId);
+        resolver.addLocation(routerPath, nsId, nnPath);
       }
 
-      // Root path goes to both NS1
-      resolver.addLocation("/", nameservices.get(0), "/");
+      // Root path points to both first nameservice
+      String ns0 = nameservices.get(0);
+      resolver.addLocation("/", ns0, "/");
+    }
+  }
+
+  public MiniDFSCluster getCluster() {
+    return cluster;
+  }
+
+  /**
+   * Wait until the federated cluster is up and ready.
+   * @throws IOException If we cannot wait for the cluster to be up.
+   */
+  public void waitClusterUp() throws IOException {
+    cluster.waitClusterUp();
+    registerNamenodes();
+    try {
+      waitNamenodeRegistration();
+    } catch (Exception e) {
+      throw new IOException("Cannot wait for the namenodes", e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
index 8c720c7..d8afb39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouter.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -51,6 +52,10 @@ public class TestRouter {
     conf.set(DFSConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
         MockResolver.class.getCanonicalName());
 
+    // Bind to any available port
+    conf.set(DFSConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, "0.0.0.0");
+    conf.set(DFSConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "127.0.0.1:0");
+
     // Simulate a co-located NN
     conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns0");
     conf.set("fs.defaultFS", "hdfs://" + "ns0");
@@ -90,7 +95,31 @@ public class TestRouter {
   @Test
   public void testRouterService() throws InterruptedException, IOException {
 
+    // Rpc only
+    testRouterStartup(new RouterConfigBuilder(conf).rpc().build());
+
     // Run with all services
-    testRouterStartup((new RouterConfigBuilder(conf)).build());
+    testRouterStartup(new RouterConfigBuilder(conf).all().build());
+  }
+
+  @Test
+  public void testRouterRestartRpcService() throws IOException {
+
+    // Start
+    Router router = new Router();
+    router.init(new RouterConfigBuilder(conf).rpc().build());
+    router.start();
+
+    // Verify RPC server is running
+    assertNotNull(router.getRpcServerAddress());
+    RouterRpcServer rpcServer = router.getRpcServer();
+    assertNotNull(rpcServer);
+    assertEquals(STATE.STARTED, rpcServer.getServiceState());
+
+    // Stop router and RPC server
+    router.stop();
+    assertEquals(STATE.STOPPED, rpcServer.getServiceState());
+
+    router.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
new file mode 100644
index 0000000..af506c9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -0,0 +1,869 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.addDirectory;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.countContents;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.deleteFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileStatus;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
+import static org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.TEST_STRING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.Service.STATE;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * The the RPC interface of the {@link Router} implemented by
+ * {@link RouterRpcServer}.
+ */
+public class TestRouterRpc {
+
+  /** Federated HDFS cluster. */
+  private static RouterDFSCluster cluster;
+
+  /** Random Router for this federated cluster. */
+  private RouterContext router;
+
+  /** Random nameservice in the federated cluster.  */
+  private String ns;
+  /** First namenode in the nameservice. */
+  private NamenodeContext namenode;
+
+  /** Client interface to the Router. */
+  private ClientProtocol routerProtocol;
+  /** Client interface to the Namenode. */
+  private ClientProtocol nnProtocol;
+
+  /** Filesystem interface to the Router. */
+  private FileSystem routerFS;
+  /** Filesystem interface to the Namenode. */
+  private FileSystem nnFS;
+
+  /** File in the Router. */
+  private String routerFile;
+  /** File in the Namenode. */
+  private String nnFile;
+
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    cluster = new RouterDFSCluster(false, 2);
+
+    // Start NNs and DNs and wait until ready
+    cluster.startCluster();
+
+    // Start routers with only an RPC service
+    cluster.addRouterOverrides((new RouterConfigBuilder()).rpc().build());
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  @Before
+  public void testSetup() throws Exception {
+
+    // Create mock locations
+    cluster.installMockLocations();
+
+    // Delete all files via the NNs and verify
+    cluster.deleteAllFiles();
+
+    // Create test fixtures on NN
+    cluster.createTestDirectoriesNamenode();
+
+    // Wait to ensure NN has fully created its test directories
+    Thread.sleep(100);
+
+    // Pick a NS, namenode and router for this test
+    this.router = cluster.getRandomRouter();
+    this.ns = cluster.getRandomNameservice();
+    this.namenode = cluster.getNamenode(ns, null);
+
+    // Handles to the ClientProtocol interface
+    this.routerProtocol = router.getClient().getNamenode();
+    this.nnProtocol = namenode.getClient().getNamenode();
+
+    // Handles to the filesystem client
+    this.nnFS = namenode.getFileSystem();
+    this.routerFS = router.getFileSystem();
+
+    // Create a test file on the NN
+    Random r = new Random();
+    String randomFile = "testfile-" + r.nextInt();
+    this.nnFile =
+        cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
+    this.routerFile =
+        cluster.getFederatedTestDirectoryForNS(ns) + "/" + randomFile;
+
+    createFile(nnFS, nnFile, 32);
+    verifyFileExists(nnFS, nnFile);
+  }
+
+  @Test
+  public void testRpcService() throws IOException {
+    Router testRouter = new Router();
+    List<String> nss = cluster.getNameservices();
+    String ns0 = nss.get(0);
+    Configuration routerConfig = cluster.generateRouterConfiguration(ns0, null);
+    RouterRpcServer server = new RouterRpcServer(routerConfig, testRouter,
+        testRouter.getNamenodeResolver(), testRouter.getSubclusterResolver());
+    server.init(routerConfig);
+    assertEquals(STATE.INITED, server.getServiceState());
+    server.start();
+    assertEquals(STATE.STARTED, server.getServiceState());
+    server.stop();
+    assertEquals(STATE.STOPPED, server.getServiceState());
+    server.close();
+    testRouter.close();
+  }
+
+  protected RouterDFSCluster getCluster() {
+    return TestRouterRpc.cluster;
+  }
+
+  protected RouterContext getRouterContext() {
+    return this.router;
+  }
+
+  protected void setRouter(RouterContext r)
+      throws IOException, URISyntaxException {
+    this.router = r;
+    this.routerProtocol = r.getClient().getNamenode();
+    this.routerFS = r.getFileSystem();
+  }
+
+  protected FileSystem getRouterFileSystem() {
+    return this.routerFS;
+  }
+
+  protected FileSystem getNamenodeFileSystem() {
+    return this.nnFS;
+  }
+
+  protected ClientProtocol getRouterProtocol() {
+    return this.routerProtocol;
+  }
+
+  protected ClientProtocol getNamenodeProtocol() {
+    return this.nnProtocol;
+  }
+
+  protected NamenodeContext getNamenode() {
+    return this.namenode;
+  }
+
+  protected void setNamenodeFile(String filename) {
+    this.nnFile = filename;
+  }
+
+  protected String getNamenodeFile() {
+    return this.nnFile;
+  }
+
+  protected void setRouterFile(String filename) {
+    this.routerFile = filename;
+  }
+
+  protected String getRouterFile() {
+    return this.routerFile;
+  }
+
+  protected void setNamenode(NamenodeContext nn)
+      throws IOException, URISyntaxException {
+    this.namenode = nn;
+    this.nnProtocol = nn.getClient().getNamenode();
+    this.nnFS = nn.getFileSystem();
+  }
+
+  protected String getNs() {
+    return this.ns;
+  }
+
+  protected void setNs(String nameservice) {
+    this.ns = nameservice;
+  }
+
+  protected static void compareResponses(
+      ClientProtocol protocol1, ClientProtocol protocol2,
+      Method m, Object[] paramList) {
+
+    Object return1 = null;
+    Exception exception1 = null;
+    try {
+      return1 = m.invoke(protocol1, paramList);
+    } catch (Exception ex) {
+      exception1 = ex;
+    }
+
+    Object return2 = null;
+    Exception exception2 = null;
+    try {
+      return2 = m.invoke(protocol2, paramList);
+    } catch (Exception ex) {
+      exception2 = ex;
+    }
+
+    assertEquals(return1, return2);
+    if (exception1 == null && exception2 == null) {
+      return;
+    }
+
+    assertEquals(
+        exception1.getCause().getClass(),
+        exception2.getCause().getClass());
+  }
+
+  @Test
+  public void testProxyListFiles() throws IOException, InterruptedException,
+      URISyntaxException, NoSuchMethodException, SecurityException {
+
+    // Verify that the root listing is a union of the mount table destinations
+    // and the files stored at all nameservices mounted at the root (ns0 + ns1)
+    //
+    // / -->
+    // /ns0 (from mount table)
+    // /ns1 (from mount table)
+    // all items in / of ns0 (default NS)
+
+    // Collect the mount table entries from the root mount point
+    Set<String> requiredPaths = new TreeSet<>();
+    FileSubclusterResolver fileResolver =
+        router.getRouter().getSubclusterResolver();
+    for (String mount : fileResolver.getMountPoints("/")) {
+      requiredPaths.add(mount);
+    }
+
+    // Collect all files/dirs on the root path of the default NS
+    String defaultNs = cluster.getNameservices().get(0);
+    NamenodeContext nn = cluster.getNamenode(defaultNs, null);
+    FileStatus[] iterator = nn.getFileSystem().listStatus(new Path("/"));
+    for (FileStatus file : iterator) {
+      requiredPaths.add(file.getPath().getName());
+    }
+
+    // Fetch listing
+    DirectoryListing listing =
+        routerProtocol.getListing("/", HdfsFileStatus.EMPTY_NAME, false);
+    Iterator<String> requiredPathsIterator = requiredPaths.iterator();
+    // Match each path returned and verify order returned
+    for(HdfsFileStatus f : listing.getPartialListing()) {
+      String fileName = requiredPathsIterator.next();
+      String currentFile = f.getFullPath(new Path("/")).getName();
+      assertEquals(currentFile, fileName);
+    }
+
+    // Verify the total number of results found/matched
+    assertEquals(requiredPaths.size(), listing.getPartialListing().length);
+
+    // List a path that doesn't exist and validate error response with NN
+    // behavior.
+    Method m = ClientProtocol.class.getMethod(
+        "getListing", String.class, byte[].class, boolean.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol, m,
+        new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
+  }
+
+  @Test
+  public void testProxyListFilesWithConflict()
+      throws IOException, InterruptedException {
+
+    // Add a directory to the namespace that conflicts with a mount point
+    NamenodeContext nn = cluster.getNamenode(ns, null);
+    FileSystem nnFs = nn.getFileSystem();
+    addDirectory(nnFs, cluster.getFederatedTestDirectoryForNS(ns));
+
+    FileSystem routerFs = router.getFileSystem();
+    int initialCount = countContents(routerFs, "/");
+
+    // Root file system now for NS X:
+    // / ->
+    // /ns0 (mount table)
+    // /ns1 (mount table)
+    // /target-ns0 (the target folder for the NS0 mapped to /
+    // /nsX (local directory that duplicates mount table)
+    int newCount = countContents(routerFs, "/");
+    assertEquals(initialCount, newCount);
+
+    // Verify that each root path is readable and contains one test directory
+    assertEquals(1, countContents(routerFs, cluster.getFederatedPathForNS(ns)));
+
+    // Verify that real folder for the ns contains a single test directory
+    assertEquals(1, countContents(nnFs, cluster.getNamenodePathForNS(ns)));
+
+  }
+
+  protected void testRename(RouterContext testRouter, String filename,
+      String renamedFile, boolean exceptionExpected) throws IOException {
+
+    createFile(testRouter.getFileSystem(), filename, 32);
+    // verify
+    verifyFileExists(testRouter.getFileSystem(), filename);
+    // rename
+    boolean exceptionThrown = false;
+    try {
+      DFSClient client = testRouter.getClient();
+      ClientProtocol clientProtocol = client.getNamenode();
+      clientProtocol.rename(filename, renamedFile);
+    } catch (Exception ex) {
+      exceptionThrown = true;
+    }
+    if (exceptionExpected) {
+      // Error was expected
+      assertTrue(exceptionThrown);
+      FileContext fileContext = testRouter.getFileContext();
+      assertTrue(fileContext.delete(new Path(filename), true));
+    } else {
+      // No error was expected
+      assertFalse(exceptionThrown);
+      // verify
+      assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile));
+      // delete
+      FileContext fileContext = testRouter.getFileContext();
+      assertTrue(fileContext.delete(new Path(renamedFile), true));
+    }
+  }
+
+  protected void testRename2(RouterContext testRouter, String filename,
+      String renamedFile, boolean exceptionExpected) throws IOException {
+    createFile(testRouter.getFileSystem(), filename, 32);
+    // verify
+    verifyFileExists(testRouter.getFileSystem(), filename);
+    // rename
+    boolean exceptionThrown = false;
+    try {
+      DFSClient client = testRouter.getClient();
+      ClientProtocol clientProtocol = client.getNamenode();
+      clientProtocol.rename2(filename, renamedFile, new Options.Rename[] {});
+    } catch (Exception ex) {
+      exceptionThrown = true;
+    }
+    assertEquals(exceptionExpected, exceptionThrown);
+    if (exceptionExpected) {
+      // Error was expected
+      FileContext fileContext = testRouter.getFileContext();
+      assertTrue(fileContext.delete(new Path(filename), true));
+    } else {
+      // verify
+      assertTrue(verifyFileExists(testRouter.getFileSystem(), renamedFile));
+      // delete
+      FileContext fileContext = testRouter.getFileContext();
+      assertTrue(fileContext.delete(new Path(renamedFile), true));
+    }
+  }
+
+  @Test
+  public void testProxyRenameFiles() throws IOException, InterruptedException {
+
+    Thread.sleep(5000);
+    List<String> nss = cluster.getNameservices();
+    String ns0 = nss.get(0);
+    String ns1 = nss.get(1);
+
+    // Rename within the same namespace
+    // /ns0/testdir/testrename -> /ns0/testdir/testrename-append
+    String filename =
+        cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename";
+    String renamedFile = filename + "-append";
+    testRename(router, filename, renamedFile, false);
+    testRename2(router, filename, renamedFile, false);
+
+    // Rename a file to a destination that is in a different namespace (fails)
+    filename = cluster.getFederatedTestDirectoryForNS(ns0) + "/testrename";
+    renamedFile = cluster.getFederatedTestDirectoryForNS(ns1) + "/testrename";
+    testRename(router, filename, renamedFile, true);
+    testRename2(router, filename, renamedFile, true);
+  }
+
+  @Test
+  public void testProxyChownFiles() throws Exception {
+
+    String newUsername = "TestUser";
+    String newGroup = "TestGroup";
+
+    // change owner
+    routerProtocol.setOwner(routerFile, newUsername, newGroup);
+
+    // Verify with NN
+    FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile);
+    assertEquals(file.getOwner(), newUsername);
+    assertEquals(file.getGroup(), newGroup);
+
+    // Bad request and validate router response matches NN response.
+    Method m = ClientProtocol.class.getMethod("setOwner", String.class,
+        String.class, String.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol, m,
+        new Object[] {badPath, newUsername, newGroup});
+  }
+
+  @Test
+  public void testProxyGetStats() throws Exception {
+
+    long[] combinedData = routerProtocol.getStats();
+
+    long[] individualData = new long[10];
+    for (String nameservice : cluster.getNameservices()) {
+      NamenodeContext n = cluster.getNamenode(nameservice, null);
+      DFSClient client = n.getClient();
+      ClientProtocol clientProtocol = client.getNamenode();
+      long[] data = clientProtocol.getStats();
+      for (int i = 0; i < data.length; i++) {
+        individualData[i] += data[i];
+      }
+      assert(data.length == combinedData.length);
+    }
+
+    for (int i = 0; i < combinedData.length && i < individualData.length; i++) {
+      if (i == ClientProtocol.GET_STATS_REMAINING_IDX) {
+        // Skip available storage as this fluctuates in mini cluster
+        continue;
+      }
+      assertEquals(combinedData[i], individualData[i]);
+    }
+  }
+
+  @Test
+  public void testProxyGetDatanodeReport() throws Exception {
+
+    DatanodeInfo[] combinedData =
+        routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
+
+    Set<Integer> individualData = new HashSet<Integer>();
+    for (String nameservice : cluster.getNameservices()) {
+      NamenodeContext n = cluster.getNamenode(nameservice, null);
+      DFSClient client = n.getClient();
+      ClientProtocol clientProtocol = client.getNamenode();
+      DatanodeInfo[] data =
+          clientProtocol.getDatanodeReport(DatanodeReportType.ALL);
+      for (int i = 0; i < data.length; i++) {
+        // Collect unique DNs based on their xfer port
+        DatanodeInfo info = data[i];
+        individualData.add(info.getXferPort());
+      }
+    }
+    assertEquals(combinedData.length, individualData.size());
+  }
+
+  @Test
+  public void testProxyGetDatanodeStorageReport()
+      throws IOException, InterruptedException, URISyntaxException {
+
+    DatanodeStorageReport[] combinedData =
+        routerProtocol.getDatanodeStorageReport(DatanodeReportType.ALL);
+
+    Set<String> individualData = new HashSet<>();
+    for (String nameservice : cluster.getNameservices()) {
+      NamenodeContext n = cluster.getNamenode(nameservice, null);
+      DFSClient client = n.getClient();
+      ClientProtocol clientProtocol = client.getNamenode();
+      DatanodeStorageReport[] data =
+          clientProtocol.getDatanodeStorageReport(DatanodeReportType.ALL);
+      for (DatanodeStorageReport report : data) {
+        // Determine unique DN instances
+        DatanodeInfo dn = report.getDatanodeInfo();
+        individualData.add(dn.toString());
+      }
+    }
+    assertEquals(combinedData.length, individualData.size());
+  }
+
+  @Test
+  public void testProxyMkdir() throws Exception {
+
+    // Check the initial folders
+    FileStatus[] filesInitial = routerFS.listStatus(new Path("/"));
+
+    // Create a directory via the router at the root level
+    String dirPath = "/testdir";
+    FsPermission permission = new FsPermission("705");
+    routerProtocol.mkdirs(dirPath, permission, false);
+
+    // Verify the root listing has the item via the router
+    FileStatus[] files = routerFS.listStatus(new Path("/"));
+    assertEquals(Arrays.toString(files) + " should be " +
+        Arrays.toString(filesInitial) + " + " + dirPath,
+        filesInitial.length + 1, files.length);
+    assertTrue(verifyFileExists(routerFS, dirPath));
+
+    // Verify the directory is present in only 1 Namenode
+    int foundCount = 0;
+    for (NamenodeContext n : cluster.getNamenodes()) {
+      if (verifyFileExists(n.getFileSystem(), dirPath)) {
+        foundCount++;
+      }
+    }
+    assertEquals(1, foundCount);
+    assertTrue(deleteFile(routerFS, dirPath));
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod("mkdirs", String.class,
+        FsPermission.class, boolean.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol, m,
+        new Object[] {badPath, permission, false});
+  }
+
+  @Test
+  public void testProxyChmodFiles() throws Exception {
+
+    FsPermission permission = new FsPermission("444");
+
+    // change permissions
+    routerProtocol.setPermission(routerFile, permission);
+
+    // Validate permissions NN
+    FileStatus file = getFileStatus(namenode.getFileSystem(), nnFile);
+    assertEquals(permission, file.getPermission());
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod(
+        "setPermission", String.class, FsPermission.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol, m,
+        new Object[] {badPath, permission});
+  }
+
+  @Test
+  public void testProxySetReplication() throws Exception {
+
+    // Check current replication via NN
+    FileStatus file = getFileStatus(nnFS, nnFile);
+    assertEquals(1, file.getReplication());
+
+    // increment replication via router
+    routerProtocol.setReplication(routerFile, (short) 2);
+
+    // Verify via NN
+    file = getFileStatus(nnFS, nnFile);
+    assertEquals(2, file.getReplication());
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod(
+        "setReplication", String.class, short.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol, m,
+        new Object[] {badPath, (short) 2});
+  }
+
+  @Test
+  public void testProxyTruncateFile() throws Exception {
+
+    // Check file size via NN
+    FileStatus file = getFileStatus(nnFS, nnFile);
+    assertTrue(file.getLen() > 0);
+
+    // Truncate to 0 bytes via router
+    routerProtocol.truncate(routerFile, 0, "testclient");
+
+    // Verify via NN
+    file = getFileStatus(nnFS, nnFile);
+    assertEquals(0, file.getLen());
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod(
+        "truncate", String.class, long.class, String.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol, m,
+        new Object[] {badPath, (long) 0, "testclient"});
+  }
+
+  @Test
+  public void testProxyGetBlockLocations() throws Exception {
+
+    // Fetch block locations via router
+    LocatedBlocks locations =
+        routerProtocol.getBlockLocations(routerFile, 0, 1024);
+    assertEquals(1, locations.getLocatedBlocks().size());
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod(
+        "getBlockLocations", String.class, long.class, long.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol,
+        m, new Object[] {badPath, (long) 0, (long) 0});
+  }
+
+  @Test
+  public void testProxyStoragePolicy() throws Exception {
+
+    // Query initial policy via NN
+    HdfsFileStatus status = namenode.getClient().getFileInfo(nnFile);
+
+    // Set a random policy via router
+    BlockStoragePolicy[] policies = namenode.getClient().getStoragePolicies();
+    BlockStoragePolicy policy = policies[0];
+
+    while (policy.isCopyOnCreateFile()) {
+      // Pick a non copy on create policy
+      Random rand = new Random();
+      int randIndex = rand.nextInt(policies.length);
+      policy = policies[randIndex];
+    }
+    routerProtocol.setStoragePolicy(routerFile, policy.getName());
+
+    // Verify policy via NN
+    HdfsFileStatus newStatus = namenode.getClient().getFileInfo(nnFile);
+    assertTrue(newStatus.getStoragePolicy() == policy.getId());
+    assertTrue(newStatus.getStoragePolicy() != status.getStoragePolicy());
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod("setStoragePolicy", String.class,
+        String.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol,
+        m, new Object[] {badPath, "badpolicy"});
+  }
+
+  @Test
+  public void testProxyGetPreferedBlockSize() throws Exception {
+
+    // Query via NN and Router and verify
+    long namenodeSize = nnProtocol.getPreferredBlockSize(nnFile);
+    long routerSize = routerProtocol.getPreferredBlockSize(routerFile);
+    assertEquals(routerSize, namenodeSize);
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod(
+        "getPreferredBlockSize", String.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(
+        routerProtocol, nnProtocol, m, new Object[] {badPath});
+  }
+
+  private void testConcat(
+      String source, String target, boolean failureExpected) {
+    boolean failure = false;
+    try {
+      // Concat test file with fill block length file via router
+      routerProtocol.concat(target, new String[] {source});
+    } catch (IOException ex) {
+      failure = true;
+    }
+    assertEquals(failureExpected, failure);
+  }
+
+  @Test
+  public void testProxyConcatFile() throws Exception {
+
+    // Create a stub file in the primary ns
+    String sameNameservice = ns;
+    String existingFile =
+        cluster.getFederatedTestDirectoryForNS(sameNameservice) +
+        "_concatfile";
+    int existingFileSize = 32;
+    createFile(routerFS, existingFile, existingFileSize);
+
+    // Identify an alternate nameservice that doesn't match the existing file
+    String alternateNameservice = null;
+    for (String n : cluster.getNameservices()) {
+      if (!n.equals(sameNameservice)) {
+        alternateNameservice = n;
+        break;
+      }
+    }
+
+    // Create new files, must be a full block to use concat. One file is in the
+    // same namespace as the target file, the other is in a different namespace.
+    String altRouterFile =
+        cluster.getFederatedTestDirectoryForNS(alternateNameservice) +
+        "_newfile";
+    String sameRouterFile =
+        cluster.getFederatedTestDirectoryForNS(sameNameservice) +
+        "_newfile";
+    createFile(routerFS, altRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+    createFile(routerFS, sameRouterFile, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+
+    // Concat in different namespaces, fails
+    testConcat(existingFile, altRouterFile, true);
+
+    // Concat in same namespaces, succeeds
+    testConcat(existingFile, sameRouterFile, false);
+
+    // Check target file length
+    FileStatus status = getFileStatus(routerFS, sameRouterFile);
+    assertEquals(
+        existingFileSize + DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT,
+        status.getLen());
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod(
+        "concat", String.class, String[].class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(routerProtocol, nnProtocol, m,
+        new Object[] {badPath, new String[] {routerFile}});
+  }
+
+  @Test
+  public void testProxyAppend() throws Exception {
+
+    // Append a test string via router
+    EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.APPEND);
+    DFSClient routerClient = getRouterContext().getClient();
+    HdfsDataOutputStream stream =
+        routerClient.append(routerFile, 1024, createFlag, null, null);
+    stream.writeBytes(TEST_STRING);
+    stream.close();
+
+    // Verify file size via NN
+    FileStatus status = getFileStatus(nnFS, nnFile);
+    assertTrue(status.getLen() > TEST_STRING.length());
+
+    // Validate router failure response matches NN failure response.
+    Method m = ClientProtocol.class.getMethod("append", String.class,
+        String.class, EnumSetWritable.class);
+    String badPath = "/unknownlocation/unknowndir";
+    EnumSetWritable<CreateFlag> createFlagWritable =
+        new EnumSetWritable<CreateFlag>(createFlag);
+    compareResponses(routerProtocol, nnProtocol, m,
+        new Object[] {badPath, "testClient", createFlagWritable});
+  }
+
+  @Test
+  public void testProxyGetAdditionalDatanode()
+      throws IOException, InterruptedException, URISyntaxException {
+
+    // Use primitive APIs to open a file, add a block, and get datanode location
+    EnumSet<CreateFlag> createFlag = EnumSet.of(CreateFlag.CREATE);
+    String clientName = getRouterContext().getClient().getClientName();
+    String newRouterFile = routerFile + "_additionalDatanode";
+    HdfsFileStatus status = routerProtocol.create(
+        newRouterFile, new FsPermission("777"), clientName,
+        new EnumSetWritable<CreateFlag>(createFlag), true, (short) 1,
+        (long) 1024, CryptoProtocolVersion.supported(), null);
+
+    // Add a block via router (requires client to have same lease)
+    LocatedBlock block = routerProtocol.addBlock(
+        newRouterFile, clientName, null, null,
+        status.getFileId(), null, null);
+
+    DatanodeInfo[] exclusions = new DatanodeInfo[0];
+    LocatedBlock newBlock = routerProtocol.getAdditionalDatanode(
+        newRouterFile, status.getFileId(), block.getBlock(),
+        block.getLocations(), block.getStorageIDs(), exclusions, 1, clientName);
+    assertNotNull(newBlock);
+  }
+
+  @Test
+  public void testProxyCreateFileAlternateUser()
+      throws IOException, URISyntaxException, InterruptedException {
+
+    // Create via Router
+    String routerDir = cluster.getFederatedTestDirectoryForNS(ns);
+    String namenodeDir = cluster.getNamenodeTestDirectoryForNS(ns);
+    String newRouterFile = routerDir + "/unknownuser";
+    String newNamenodeFile = namenodeDir + "/unknownuser";
+    String username = "unknownuser";
+
+    // Allow all user access to dir
+    namenode.getFileContext().setPermission(
+        new Path(namenodeDir), new FsPermission("777"));
+
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(username);
+    DFSClient client = getRouterContext().getClient(ugi);
+    client.create(newRouterFile, true);
+
+    // Fetch via NN and check user
+    FileStatus status = getFileStatus(nnFS, newNamenodeFile);
+    assertEquals(status.getOwner(), username);
+  }
+
+  @Test
+  public void testProxyGetFileInfoAcessException() throws IOException {
+
+    UserGroupInformation ugi =
+        UserGroupInformation.createRemoteUser("unknownuser");
+
+    // List files from the NN and trap the exception
+    Exception nnFailure = null;
+    try {
+      String testFile = cluster.getNamenodeTestFileForNS(ns);
+      namenode.getClient(ugi).getLocatedBlocks(testFile, 0);
+    } catch (Exception e) {
+      nnFailure = e;
+    }
+    assertNotNull(nnFailure);
+
+    // List files from the router and trap the exception
+    Exception routerFailure = null;
+    try {
+      String testFile = cluster.getFederatedTestFileForNS(ns);
+      getRouterContext().getClient(ugi).getLocatedBlocks(testFile, 0);
+    } catch (Exception e) {
+      routerFailure = e;
+    }
+    assertNotNull(routerFailure);
+
+    assertEquals(routerFailure.getClass(), nnFailure.getClass());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5b52be/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
new file mode 100644
index 0000000..5489691
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
+import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+
+/**
+ * The the RPC interface of the {@link getRouter()} implemented by
+ * {@link RouterRpcServer}.
+ */
+public class TestRouterRpcMultiDestination extends TestRouterRpc {
+
+  @Override
+  public void testSetup() throws Exception {
+
+    RouterDFSCluster cluster = getCluster();
+
+    // Create mock locations
+    getCluster().installMockLocations();
+    List<RouterContext> routers = cluster.getRouters();
+
+    // Add extra location to the root mount / such that the root mount points:
+    // /
+    //   ns0 -> /
+    //   ns1 -> /
+    for (RouterContext rc : routers) {
+      Router router = rc.getRouter();
+      MockResolver resolver = (MockResolver) router.getSubclusterResolver();
+      resolver.addLocation("/", cluster.getNameservices().get(1), "/");
+    }
+
+    // Create a mount that points to 2 dirs in the same ns:
+    // /same
+    //   ns0 -> /
+    //   ns0 -> /target-ns0
+    for (RouterContext rc : routers) {
+      Router router = rc.getRouter();
+      MockResolver resolver = (MockResolver) router.getSubclusterResolver();
+      List<String> nss = cluster.getNameservices();
+      String ns0 = nss.get(0);
+      resolver.addLocation("/same", ns0, "/");
+      resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0));
+    }
+
+    // Delete all files via the NNs and verify
+    cluster.deleteAllFiles();
+
+    // Create test fixtures on NN
+    cluster.createTestDirectoriesNamenode();
+
+    // Wait to ensure NN has fully created its test directories
+    Thread.sleep(100);
+
+    // Pick a NS, namenode and getRouter() for this test
+    RouterContext router = cluster.getRandomRouter();
+    this.setRouter(router);
+
+    String ns = cluster.getRandomNameservice();
+    this.setNs(ns);
+    this.setNamenode(cluster.getNamenode(ns, null));
+
+    // Create a test file on a single NN that is accessed via a getRouter() path
+    // with 2 destinations. All tests should failover to the alternate
+    // destination if the wrong NN is attempted first.
+    Random r = new Random();
+    String randomString = "testfile-" + r.nextInt();
+    setNamenodeFile("/" + randomString);
+    setRouterFile("/" + randomString);
+
+    FileSystem nnFs = getNamenodeFileSystem();
+    FileSystem routerFs = getRouterFileSystem();
+    createFile(nnFs, getNamenodeFile(), 32);
+
+    verifyFileExists(nnFs, getNamenodeFile());
+    verifyFileExists(routerFs, getRouterFile());
+  }
+
+  private void testListing(String path) throws IOException {
+
+    // Collect the mount table entries for this path
+    Set<String> requiredPaths = new TreeSet<>();
+    RouterContext rc = getRouterContext();
+    Router router = rc.getRouter();
+    FileSubclusterResolver subclusterResolver = router.getSubclusterResolver();
+    for (String mount : subclusterResolver.getMountPoints(path)) {
+      requiredPaths.add(mount);
+    }
+
+    // Get files/dirs from the Namenodes
+    PathLocation location = subclusterResolver.getDestinationForPath(path);
+    for (RemoteLocation loc : location.getDestinations()) {
+      String nsId = loc.getNameserviceId();
+      String dest = loc.getDest();
+      NamenodeContext nn = getCluster().getNamenode(nsId, null);
+      FileSystem fs = nn.getFileSystem();
+      FileStatus[] files = fs.listStatus(new Path(dest));
+      for (FileStatus file : files) {
+        String pathName = file.getPath().getName();
+        requiredPaths.add(pathName);
+      }
+    }
+
+    // Get files/dirs from the Router
+    DirectoryListing listing =
+        getRouterProtocol().getListing(path, HdfsFileStatus.EMPTY_NAME, false);
+    Iterator<String> requiredPathsIterator = requiredPaths.iterator();
+
+    // Match each path returned and verify order returned
+    HdfsFileStatus[] partialListing = listing.getPartialListing();
+    for (HdfsFileStatus fileStatus : listing.getPartialListing()) {
+      String fileName = requiredPathsIterator.next();
+      String currentFile = fileStatus.getFullPath(new Path(path)).getName();
+      assertEquals(currentFile, fileName);
+    }
+
+    // Verify the total number of results found/matched
+    assertEquals(
+        requiredPaths + " doesn't match " + Arrays.toString(partialListing),
+        requiredPaths.size(), partialListing.length);
+  }
+
+  @Override
+  public void testProxyListFiles() throws IOException, InterruptedException,
+      URISyntaxException, NoSuchMethodException, SecurityException {
+
+    // Verify that the root listing is a union of the mount table destinations
+    // and the files stored at all nameservices mounted at the root (ns0 + ns1)
+    // / -->
+    // /ns0 (from mount table)
+    // /ns1 (from mount table)
+    // /same (from the mount table)
+    // all items in / of ns0 from mapping of / -> ns0:::/)
+    // all items in / of ns1 from mapping of / -> ns1:::/)
+    testListing("/");
+
+    // Verify that the "/same" mount point lists the contents of both dirs in
+    // the same ns
+    // /same -->
+    // /target-ns0 (from root of ns0)
+    // /testdir (from contents of /target-ns0)
+    testListing("/same");
+
+    // List a non-existing path and validate error response with NN behavior
+    ClientProtocol namenodeProtocol =
+        getCluster().getRandomNamenode().getClient().getNamenode();
+    Method m = ClientProtocol.class.getMethod(
+        "getListing", String.class,  byte[].class, boolean.class);
+    String badPath = "/unknownlocation/unknowndir";
+    compareResponses(getRouterProtocol(), namenodeProtocol, m,
+        new Object[] {badPath, HdfsFileStatus.EMPTY_NAME, false});
+  }
+
+  @Override
+  public void testProxyRenameFiles() throws IOException, InterruptedException {
+
+    super.testProxyRenameFiles();
+
+    List<String> nss = getCluster().getNameservices();
+    String ns0 = nss.get(0);
+    String ns1 = nss.get(1);
+
+    // Rename a file from ns0 into the root (mapped to both ns0 and ns1)
+    String testDir0 = getCluster().getFederatedTestDirectoryForNS(ns0);
+    String filename0 = testDir0 + "/testrename";
+    String renamedFile = "/testrename";
+    testRename(getRouterContext(), filename0, renamedFile, false);
+    testRename2(getRouterContext(), filename0, renamedFile, false);
+
+    // Rename a file from ns1 into the root (mapped to both ns0 and ns1)
+    String testDir1 = getCluster().getFederatedTestDirectoryForNS(ns1);
+    String filename1 = testDir1 + "/testrename";
+    testRename(getRouterContext(), filename1, renamedFile, false);
+    testRename2(getRouterContext(), filename1, renamedFile, false);
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org