You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/03/16 00:06:25 UTC

[hadoop] branch branch-2.10 updated (efe515d -> ef8582b)

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from efe515d  HADOOP-18158. Fix failure of create-release script due to releasedocmaker changes in branch-2.10 (#4055)
     new e831f7b  HADOOP-13722. Code cleanup -- ViewFileSystem and InodeTree. Contributed by Manoj Govindassamy.
     new 20ff294  HADOOP-12077. Provide a multi-URI replication Inode for ViewFs. Contributed by Gera Shegalov
     new ef8582b  HADOOP-13055. Implement linkMergeSlash and linkFallback for ViewFileSystem

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/hadoop/fs/viewfs/ConfigUtil.java    |  95 +-
 .../org/apache/hadoop/fs/viewfs/Constants.java     |  24 +-
 .../org/apache/hadoop/fs/viewfs/InodeTree.java     | 573 +++++++++----
 .../org/apache/hadoop/fs/viewfs/NflyFSystem.java   | 951 +++++++++++++++++++++
 .../apache/hadoop/fs/viewfs/ViewFileSystem.java    | 132 +--
 .../java/org/apache/hadoop/fs/viewfs/ViewFs.java   |  21 +-
 .../viewfs/TestViewFileSystemLocalFileSystem.java  |  77 +-
 .../apache/hadoop/fs/viewfs/TestViewFsConfig.java  |  37 +-
 .../hadoop/fs/viewfs/ViewFileSystemBaseTest.java   |   4 +-
 .../hadoop-hdfs/src/site/markdown/ViewFs.md        |  44 +-
 .../hadoop/fs/viewfs/TestViewFileSystemHdfs.java   | 147 +++-
 .../fs/viewfs/TestViewFileSystemLinkFallback.java  | 264 ++++++
 .../viewfs/TestViewFileSystemLinkMergeSlash.java   | 234 +++++
 13 files changed, 2333 insertions(+), 270 deletions(-)
 create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkMergeSlash.java

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/03: HADOOP-13055. Implement linkMergeSlash and linkFallback for ViewFileSystem

Posted by om...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit ef8582bfa87c734f9251dd6d9ed57dd649646134
Author: Manoj Govindassamy <ma...@apache.org>
AuthorDate: Fri Oct 13 17:43:13 2017 -0700

    HADOOP-13055. Implement linkMergeSlash and linkFallback for ViewFileSystem
    
    (cherry picked from commit 133d7ca76e3d4b60292d57429d4259e80bec650a)
    Fixes #4015
---
 .../org/apache/hadoop/fs/viewfs/ConfigUtil.java    |  68 +++-
 .../org/apache/hadoop/fs/viewfs/Constants.java     |  16 +-
 .../org/apache/hadoop/fs/viewfs/InodeTree.java     | 351 ++++++++++++++++++---
 .../apache/hadoop/fs/viewfs/ViewFileSystem.java    |  13 +-
 .../java/org/apache/hadoop/fs/viewfs/ViewFs.java   |  14 +-
 .../hadoop/fs/viewfs/ViewFileSystemBaseTest.java   |   4 +-
 .../hadoop-hdfs/src/site/markdown/ViewFs.md        |  44 ++-
 .../fs/viewfs/TestViewFileSystemLinkFallback.java  | 264 ++++++++++++++++
 .../viewfs/TestViewFileSystemLinkMergeSlash.java   | 234 ++++++++++++++
 9 files changed, 940 insertions(+), 68 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
index 8acd41f..5867f62 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.fs.viewfs;
 
 import java.net.URI;
+import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
@@ -68,7 +69,72 @@ public class ConfigUtil {
     addLink( conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, 
         src, target);   
   }
-  
+
+  /**
+   * Add a LinkMergeSlash to the config for the specified mount table.
+   * @param conf
+   * @param mountTableName
+   * @param target
+   */
+  public static void addLinkMergeSlash(Configuration conf,
+      final String mountTableName, final URI target) {
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+        Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH, target.toString());
+  }
+
+  /**
+   * Add a LinkMergeSlash to the config for the default mount table.
+   * @param conf
+   * @param target
+   */
+  public static void addLinkMergeSlash(Configuration conf, final URI target) {
+    addLinkMergeSlash(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
+        target);
+  }
+
+  /**
+   * Add a LinkFallback to the config for the specified mount table.
+   * @param conf
+   * @param mountTableName
+   * @param target
+   */
+  public static void addLinkFallback(Configuration conf,
+      final String mountTableName, final URI target) {
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+        Constants.CONFIG_VIEWFS_LINK_FALLBACK, target.toString());
+  }
+
+  /**
+   * Add a LinkFallback to the config for the default mount table.
+   * @param conf
+   * @param target
+   */
+  public static void addLinkFallback(Configuration conf, final URI target) {
+    addLinkFallback(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
+        target);
+  }
+
+  /**
+   * Add a LinkMerge to the config for the specified mount table.
+   * @param conf
+   * @param mountTableName
+   * @param targets
+   */
+  public static void addLinkMerge(Configuration conf,
+      final String mountTableName, final URI[] targets) {
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+        Constants.CONFIG_VIEWFS_LINK_MERGE, Arrays.toString(targets));
+  }
+
+  /**
+   * Add a LinkMerge to the config for the default mount table.
+   * @param conf
+   * @param targets
+   */
+  public static void addLinkMerge(Configuration conf, final URI[] targets) {
+    addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets);
+  }
+
   /**
    *
    * @param conf
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
index 3f9aae2..7a0a6661 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
@@ -51,12 +51,17 @@ public interface Constants {
   /**
    * Config variable for specifying a simple link
    */
-  public static final String CONFIG_VIEWFS_LINK = "link";
-  
+  String CONFIG_VIEWFS_LINK = "link";
+
+  /**
+   * Config variable for specifying a fallback for link mount points.
+   */
+  String CONFIG_VIEWFS_LINK_FALLBACK = "linkFallback";
+
   /**
    * Config variable for specifying a merge link
    */
-  public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
+  String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
 
   /**
    * Config variable for specifying an nfly link. Nfly writes to multiple
@@ -68,10 +73,9 @@ public interface Constants {
    * Config variable for specifying a merge of the root of the mount-table
    *  with the root of another file system. 
    */
-  public static final String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
+  String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
 
-  static public final FsPermission PERMISSION_555 =
-      new FsPermission((short) 0555);
+  FsPermission PERMISSION_555 = new FsPermission((short) 0555);
 
   String CONFIG_VIEWFS_RENAME_STRATEGY = "fs.viewfs.rename.strategy";
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
index 199ccc6..661cc9a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
@@ -18,12 +18,15 @@
 package org.apache.hadoop.fs.viewfs;
 
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -62,8 +65,12 @@ abstract class InodeTree<T> {
   }
 
   static final Path SlashPath = new Path("/");
-  private final INodeDir<T> root;     // the root of the mount table
-  private final String homedirPrefix; // the homedir for this mount table
+  // the root of the mount table
+  private final INode<T> root;
+  // the fallback filesystem
+  private final INodeLink<T> rootFallbackLink;
+  // the homedir for this mount table
+  private final String homedirPrefix;
   private List<MountPoint<T>> mountPoints = new ArrayList<MountPoint<T>>();
 
   static class MountPoint<T> {
@@ -86,7 +93,7 @@ abstract class InodeTree<T> {
   }
 
   /**
-   * Internal class for inode tree
+   * Internal class for INode tree.
    * @param <T>
    */
   abstract static class INode<T> {
@@ -95,21 +102,58 @@ abstract class InodeTree<T> {
     public INode(String pathToNode, UserGroupInformation aUgi) {
       fullPath = pathToNode;
     }
+
+    // INode forming the internal mount table directory tree
+    // for ViewFileSystem. This internal directory tree is
+    // constructed based on the mount table config entries
+    // and is read only.
+    abstract boolean isInternalDir();
+
+    // INode linking to another filesystem. Represented
+    // via mount table link config entries.
+    boolean isLink() {
+      return !isInternalDir();
+    }
   }
 
   /**
-   * Internal class to represent an internal dir of the mount table
+   * Internal class to represent an internal dir of the mount table.
    * @param <T>
    */
   static class INodeDir<T> extends INode<T> {
-    final Map<String,INode<T>> children = new HashMap<String,INode<T>>();
-    T InodeDirFs =  null; // file system of this internal directory of mountT
-    boolean isRoot = false;
+    private final Map<String, INode<T>> children = new HashMap<>();
+    private T internalDirFs =  null; //filesystem of this internal directory
+    private boolean isRoot = false;
 
     INodeDir(final String pathToNode, final UserGroupInformation aUgi) {
       super(pathToNode, aUgi);
     }
 
+    @Override
+    boolean isInternalDir() {
+      return true;
+    }
+
+    T getInternalDirFs() {
+      return internalDirFs;
+    }
+
+    void setInternalDirFs(T internalDirFs) {
+      this.internalDirFs = internalDirFs;
+    }
+
+    void setRoot(boolean root) {
+      isRoot = root;
+    }
+
+    boolean isRoot() {
+      return isRoot;
+    }
+
+    Map<String, INode<T>> getChildren() {
+      return Collections.unmodifiableMap(children);
+    }
+
     INode<T> resolveInternal(final String pathComponent) {
       return children.get(pathComponent);
     }
@@ -120,7 +164,7 @@ abstract class InodeTree<T> {
         throw new FileAlreadyExistsException();
       }
       final INodeDir<T> newDir = new INodeDir<T>(fullPath +
-          (isRoot ? "" : "/") + pathComponent, aUgi);
+          (isRoot() ? "" : "/") + pathComponent, aUgi);
       children.put(pathComponent, newDir);
       return newDir;
     }
@@ -134,10 +178,43 @@ abstract class InodeTree<T> {
     }
   }
 
+  /**
+   * Mount table link type.
+   */
   enum LinkType {
+    /**
+     * Link entry pointing to a single filesystem uri.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.link.<link_name>
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK}
+     */
     SINGLE,
+    /**
+     * Fallback filesystem for the paths not mounted by
+     * any single link entries.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkFallback
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK_FALLBACK}
+     */
+    SINGLE_FALLBACK,
+    /**
+     * Link entry pointing to an union of two or more filesystem uris.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMerge.<link_name>
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE}
+     */
     MERGE,
-    NFLY
+    /**
+     * Link entry for merging mount table's root with the
+     * root of another filesystem.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMergeSlash
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE_SLASH}
+     */
+    MERGE_SLASH,
+    /**
+     * Link entry to write to multiple filesystems and read
+     * from the closest filesystem.
+     * Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkNfly
+     * Refer: {@link Constants#CONFIG_VIEWFS_LINK_NFLY}
+     */
+    NFLY;
   }
 
   /**
@@ -195,6 +272,11 @@ abstract class InodeTree<T> {
       return new Path(result.toString());
     }
 
+    @Override
+    boolean isInternalDir() {
+      return false;
+    }
+
     /**
      * Get the instance of FileSystem to use, creating one if needed.
      * @return An Initialized instance of T
@@ -236,7 +318,10 @@ abstract class InodeTree<T> {
     }
 
     final String[] srcPaths = breakIntoPathComponents(src);
-    INodeDir<T> curInode = root;
+    // Make sure root is of INodeDir type before
+    // adding any regular links to it.
+    Preconditions.checkState(root.isInternalDir());
+    INodeDir<T> curInode = getRootDir();
     int i;
     // Ignore first initial slash, process all except last component
     for (i = 1; i < srcPaths.length - 1; i++) {
@@ -244,15 +329,15 @@ abstract class InodeTree<T> {
       INode<T> nextInode = curInode.resolveInternal(iPath);
       if (nextInode == null) {
         INodeDir<T> newDir = curInode.addDir(iPath, aUgi);
-        newDir.InodeDirFs = getTargetFileSystem(newDir);
+        newDir.setInternalDirFs(getTargetFileSystem(newDir));
         nextInode = newDir;
       }
-      if (nextInode instanceof INodeLink) {
+      if (nextInode.isLink()) {
         // Error - expected a dir but got a link
         throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
             " already exists as link");
       } else {
-        assert (nextInode instanceof INodeDir);
+        assert(nextInode.isInternalDir());
         curInode = (INodeDir<T>) nextInode;
       }
     }
@@ -278,6 +363,11 @@ abstract class InodeTree<T> {
       newLink = new INodeLink<T>(fullPath, aUgi,
           initAndGetTargetFs(), new URI(target));
       break;
+    case SINGLE_FALLBACK:
+    case MERGE_SLASH:
+      // Link fallback and link merge slash configuration
+      // are handled specially at InodeTree.
+      throw new IllegalArgumentException("Unexpected linkType: " + linkType);
     case MERGE:
     case NFLY:
       final URI[] targetUris = StringUtils.stringToURI(
@@ -305,6 +395,77 @@ abstract class InodeTree<T> {
   protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
       throws UnsupportedFileSystemException, URISyntaxException, IOException;
 
+  private INodeDir<T> getRootDir() {
+    Preconditions.checkState(root.isInternalDir());
+    return (INodeDir<T>)root;
+  }
+
+  private INodeLink<T> getRootLink() {
+    Preconditions.checkState(root.isLink());
+    return (INodeLink<T>)root;
+  }
+
+  private boolean hasFallbackLink() {
+    return rootFallbackLink != null;
+  }
+
+  private INodeLink<T> getRootFallbackLink() {
+    Preconditions.checkState(root.isInternalDir());
+    return rootFallbackLink;
+  }
+
+  /**
+   * An internal class representing the ViewFileSystem mount table
+   * link entries and their attributes.
+   * @see LinkType
+   */
+  private static class LinkEntry {
+    private final String src;
+    private final String target;
+    private final LinkType linkType;
+    private final String settings;
+    private final UserGroupInformation ugi;
+    private final Configuration config;
+
+    LinkEntry(String src, String target, LinkType linkType, String settings,
+        UserGroupInformation ugi, Configuration config) {
+      this.src = src;
+      this.target = target;
+      this.linkType = linkType;
+      this.settings = settings;
+      this.ugi = ugi;
+      this.config = config;
+    }
+
+    String getSrc() {
+      return src;
+    }
+
+    String getTarget() {
+      return target;
+    }
+
+    LinkType getLinkType() {
+      return linkType;
+    }
+
+    boolean isLinkType(LinkType type) {
+      return this.linkType == type;
+    }
+
+    String getSettings() {
+      return settings;
+    }
+
+    UserGroupInformation getUgi() {
+      return ugi;
+    }
+
+    Configuration getConfig() {
+      return config;
+    }
+  }
+
   /**
    * Create Inode Tree from the specified mount-table specified in Config
    * @param config - the mount table keys are prefixed with 
@@ -318,39 +479,59 @@ abstract class InodeTree<T> {
   protected InodeTree(final Configuration config, final String viewName)
       throws UnsupportedFileSystemException, URISyntaxException,
       FileAlreadyExistsException, IOException {
-    String vName = viewName;
-    if (vName == null) {
-      vName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
+    String mountTableName = viewName;
+    if (mountTableName == null) {
+      mountTableName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
     }
-    homedirPrefix = ConfigUtil.getHomeDirValue(config, vName);
-    root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
-    root.InodeDirFs = getTargetFileSystem(root);
-    root.isRoot = true;
+    homedirPrefix = ConfigUtil.getHomeDirValue(config, mountTableName);
+
+    boolean isMergeSlashConfigured = false;
+    String mergeSlashTarget = null;
+    List<LinkEntry> linkEntries = new LinkedList<>();
 
-    final String mtPrefix = Constants.CONFIG_VIEWFS_PREFIX + "." +
-        vName + ".";
+    final String mountTablePrefix =
+        Constants.CONFIG_VIEWFS_PREFIX + "." + mountTableName + ".";
     final String linkPrefix = Constants.CONFIG_VIEWFS_LINK + ".";
+    final String linkFallbackPrefix = Constants.CONFIG_VIEWFS_LINK_FALLBACK;
     final String linkMergePrefix = Constants.CONFIG_VIEWFS_LINK_MERGE + ".";
+    final String linkMergeSlashPrefix =
+        Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH;
     boolean gotMountTableEntry = false;
     final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     for (Entry<String, String> si : config) {
       final String key = si.getKey();
-      if (key.startsWith(mtPrefix)) {
+      if (key.startsWith(mountTablePrefix)) {
         gotMountTableEntry = true;
-        LinkType linkType = LinkType.SINGLE;
-        String src = key.substring(mtPrefix.length());
+        LinkType linkType;
+        String src = key.substring(mountTablePrefix.length());
         String settings = null;
         if (src.startsWith(linkPrefix)) {
           src = src.substring(linkPrefix.length());
           if (src.equals(SlashPath.toString())) {
             throw new UnsupportedFileSystemException("Unexpected mount table "
-                + "link entry '" + key + "'. "
-                + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH  + " is not "
-                + "supported yet.");
+                + "link entry '" + key + "'. Use "
+                + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH  + " instead!");
+          }
+          linkType = LinkType.SINGLE;
+        } else if (src.startsWith(linkFallbackPrefix)) {
+          if (src.length() != linkFallbackPrefix.length()) {
+            throw new IOException("ViewFs: Mount points initialization error." +
+                " Invalid " + Constants.CONFIG_VIEWFS_LINK_FALLBACK +
+                " entry in config: " + src);
           }
+          linkType = LinkType.SINGLE_FALLBACK;
         } else if (src.startsWith(linkMergePrefix)) { // A merge link
-          linkType = LinkType.MERGE;
           src = src.substring(linkMergePrefix.length());
+          linkType = LinkType.MERGE;
+        } else if (src.startsWith(linkMergeSlashPrefix)) {
+          // This is a LinkMergeSlash entry. This entry should
+          // not have any additional source path.
+          if (src.length() != linkMergeSlashPrefix.length()) {
+            throw new IOException("ViewFs: Mount points initialization error." +
+                " Invalid " + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH +
+                " entry in config: " + src);
+          }
+          linkType = LinkType.MERGE_SLASH;
         } else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
           // prefix.settings.src
           src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
@@ -370,14 +551,69 @@ abstract class InodeTree<T> {
           throw new IOException("ViewFs: Cannot initialize: Invalid entry in " +
               "Mount table in config: " + src);
         }
-        final String target = si.getValue(); // link or merge link
-        createLink(src, target, linkType, settings, ugi, config);
+
+        final String target = si.getValue();
+        if (linkType != LinkType.MERGE_SLASH) {
+          if (isMergeSlashConfigured) {
+            throw new IOException("Mount table " + mountTableName
+                + " has already been configured with a merge slash link. "
+                + "A regular link should not be added.");
+          }
+          linkEntries.add(
+              new LinkEntry(src, target, linkType, settings, ugi, config));
+        } else {
+          if (!linkEntries.isEmpty()) {
+            throw new IOException("Mount table " + mountTableName
+                + " has already been configured with regular links. "
+                + "A merge slash link should not be configured.");
+          }
+          if (isMergeSlashConfigured) {
+            throw new IOException("Mount table " + mountTableName
+                + " has already been configured with a merge slash link. "
+                + "Multiple merge slash links for the same mount table is "
+                + "not allowed.");
+          }
+          isMergeSlashConfigured = true;
+          mergeSlashTarget = target;
+        }
+      }
+    }
+
+    if (isMergeSlashConfigured) {
+      Preconditions.checkNotNull(mergeSlashTarget);
+      root = new INodeLink<T>(mountTableName, ugi,
+          initAndGetTargetFs(),
+          new URI(mergeSlashTarget));
+      mountPoints.add(new MountPoint<T>("/", (INodeLink<T>) root));
+      rootFallbackLink = null;
+    } else {
+      root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
+      getRootDir().setInternalDirFs(getTargetFileSystem(getRootDir()));
+      getRootDir().setRoot(true);
+      INodeLink<T> fallbackLink = null;
+      for (LinkEntry le : linkEntries) {
+        if (le.isLinkType(LinkType.SINGLE_FALLBACK)) {
+          if (fallbackLink != null) {
+            throw new IOException("Mount table " + mountTableName
+                + " has already been configured with a link fallback. "
+                + "Multiple fallback links for the same mount table is "
+                + "not allowed.");
+          }
+          fallbackLink = new INodeLink<T>(mountTableName, ugi,
+              initAndGetTargetFs(),
+              new URI(le.getTarget()));
+        } else {
+          createLink(le.getSrc(), le.getTarget(), le.getLinkType(),
+              le.getSettings(), le.getUgi(), le.getConfig());
+        }
       }
+      rootFallbackLink = fallbackLink;
     }
+
     if (!gotMountTableEntry) {
       throw new IOException(
           "ViewFs: Cannot initialize: Empty Mount table in config for " +
-              "viewfs://" + vName + "/");
+              "viewfs://" + mountTableName + "/");
     }
   }
 
@@ -414,7 +650,7 @@ abstract class InodeTree<T> {
 
   /**
    * Resolve the pathname p relative to root InodeDir
-   * @param p - inout path
+   * @param p - input path
    * @param resolveLastComponent
    * @return ResolveResult which allows further resolution of the remaining path
    * @throws FileNotFoundException
@@ -424,26 +660,53 @@ abstract class InodeTree<T> {
     // TO DO: - more efficient to not split the path, but simply compare
     String[] path = breakIntoPathComponents(p); 
     if (path.length <= 1) { // special case for when path is "/"
-      ResolveResult<T> res =
-          new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-              root.InodeDirFs, root.fullPath, SlashPath);
+      T targetFs = root.isInternalDir() ?
+          getRootDir().getInternalDirFs() : getRootLink().getTargetFileSystem();
+      ResolveResult<T> res = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+          targetFs, root.fullPath, SlashPath);
+      return res;
+    }
+
+    /**
+     * linkMergeSlash has been configured. The root of this mount table has
+     * been linked to the root directory of a file system.
+     * The first non-slash path component should be name of the mount table.
+     */
+    if (root.isLink()) {
+      Path remainingPath;
+      StringBuilder remainingPathStr = new StringBuilder();
+      // ignore first slash
+      for (int i = 1; i < path.length; i++) {
+        remainingPathStr.append("/").append(path[i]);
+      }
+      remainingPath = new Path(remainingPathStr.toString());
+      ResolveResult<T> res = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+          getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
       return res;
     }
+    Preconditions.checkState(root.isInternalDir());
+    INodeDir<T> curInode = getRootDir();
 
-    INodeDir<T> curInode = root;
     int i;
     // ignore first slash
     for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
       INode<T> nextInode = curInode.resolveInternal(path[i]);
       if (nextInode == null) {
-        StringBuilder failedAt = new StringBuilder(path[0]);
-        for (int j = 1; j <= i; ++j) {
-          failedAt.append('/').append(path[j]);
+        if (hasFallbackLink()) {
+          return new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+              getRootFallbackLink().getTargetFileSystem(),
+              root.fullPath, new Path(p));
+        } else {
+          StringBuilder failedAt = new StringBuilder(path[0]);
+          for (int j = 1; j <= i; ++j) {
+            failedAt.append('/').append(path[j]);
+          }
+          throw (new FileNotFoundException(
+              "File/Directory does not exist: " + failedAt.toString()));
         }
-        throw (new FileNotFoundException(failedAt.toString()));
       }
 
-      if (nextInode instanceof INodeLink) {
+      if (nextInode.isLink()) {
         final INodeLink<T> link = (INodeLink<T>) nextInode;
         final Path remainingPath;
         if (i >= path.length - 1) {
@@ -459,7 +722,7 @@ abstract class InodeTree<T> {
             new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
                 link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
         return res;
-      } else if (nextInode instanceof INodeDir) {
+      } else if (nextInode.isInternalDir()) {
         curInode = (INodeDir<T>) nextInode;
       }
     }
@@ -481,7 +744,7 @@ abstract class InodeTree<T> {
     }
     final ResolveResult<T> res =
         new ResolveResult<T>(ResultKind.INTERNAL_DIR,
-            curInode.InodeDirFs, curInode.fullPath, remainingPath);
+            curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
     return res;
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
index 9726100..672839b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
@@ -1167,12 +1167,12 @@ public class ViewFileSystem extends FileSystem {
     public FileStatus[] listStatus(Path f) throws AccessControlException,
         FileNotFoundException, IOException {
       checkPathIsSlash(f);
-      FileStatus[] result = new FileStatus[theInternalDir.children.size()];
+      FileStatus[] result = new FileStatus[theInternalDir.getChildren().size()];
       int i = 0;
-      for (Entry<String, INode<FileSystem>> iEntry : 
-                                          theInternalDir.children.entrySet()) {
+      for (Entry<String, INode<FileSystem>> iEntry :
+          theInternalDir.getChildren().entrySet()) {
         INode<FileSystem> inode = iEntry.getValue();
-        if (inode instanceof INodeLink ) {
+        if (inode.isLink()) {
           INodeLink<FileSystem> link = (INodeLink<FileSystem>) inode;
 
           result[i++] = new FileStatus(0, false, 0, 0,
@@ -1195,11 +1195,12 @@ public class ViewFileSystem extends FileSystem {
     @Override
     public boolean mkdirs(Path dir, FsPermission permission)
         throws AccessControlException, FileAlreadyExistsException {
-      if (theInternalDir.isRoot && dir == null) {
+      if (theInternalDir.isRoot() && dir == null) {
         throw new FileAlreadyExistsException("/ already exits");
       }
       // Note dir starts with /
-      if (theInternalDir.children.containsKey(dir.toString().substring(1))) {
+      if (theInternalDir.getChildren().containsKey(
+          dir.toString().substring(1))) {
         return true; // this is the stupid semantics of FileSystem
       }
       throw readOnlyMountTable("mkdirs",  dir);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
index 26f3a15..9c12baa 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
@@ -922,13 +922,13 @@ public class ViewFs extends AbstractFileSystem {
         throws IOException {
       // look up i internalDirs children - ignore first Slash
       INode<AbstractFileSystem> inode =
-        theInternalDir.children.get(f.toUri().toString().substring(1)); 
+          theInternalDir.getChildren().get(f.toUri().toString().substring(1));
       if (inode == null) {
         throw new FileNotFoundException(
             "viewFs internal mount table - missing entry:" + f);
       }
       FileStatus result;
-      if (inode instanceof INodeLink) {
+      if (inode.isLink()) {
         INodeLink<AbstractFileSystem> inodelink = 
           (INodeLink<AbstractFileSystem>) inode;
         result = new FileStatus(0, false, 0, 0, creationTime, creationTime,
@@ -970,14 +970,14 @@ public class ViewFs extends AbstractFileSystem {
     public FileStatus[] listStatus(final Path f) throws AccessControlException,
         IOException {
       checkPathIsSlash(f);
-      FileStatus[] result = new FileStatus[theInternalDir.children.size()];
+      FileStatus[] result = new FileStatus[theInternalDir.getChildren().size()];
       int i = 0;
-      for (Entry<String, INode<AbstractFileSystem>> iEntry : 
-                                          theInternalDir.children.entrySet()) {
+      for (Entry<String, INode<AbstractFileSystem>> iEntry :
+          theInternalDir.getChildren().entrySet()) {
         INode<AbstractFileSystem> inode = iEntry.getValue();
 
         
-        if (inode instanceof INodeLink ) {
+        if (inode.isLink()) {
           INodeLink<AbstractFileSystem> link = 
             (INodeLink<AbstractFileSystem>) inode;
 
@@ -1002,7 +1002,7 @@ public class ViewFs extends AbstractFileSystem {
     public void mkdir(final Path dir, final FsPermission permission,
         final boolean createParent) throws AccessControlException,
         FileAlreadyExistsException {
-      if (theInternalDir.isRoot && dir == null) {
+      if (theInternalDir.isRoot() && dir == null) {
         throw new FileAlreadyExistsException("/ already exits");
       }
       throw readOnlyMountTable("mkdir", dir);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
index 6b398bc..c33a180 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
@@ -1300,8 +1300,8 @@ abstract public class ViewFileSystemBaseTest {
           + mtPrefix + Constants.CONFIG_VIEWFS_LINK + "." + "/");
     } catch (Exception e) {
       if (e instanceof UnsupportedFileSystemException) {
-        String msg = Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH
-            + " is not supported yet.";
+        String msg = " Use " + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH +
+            " instead";
         assertThat(e.getMessage(), containsString(msg));
       } else {
         fail("Unexpected exception: " + e.getMessage());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
index 14d998f..b2f7e57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
@@ -91,7 +91,7 @@ In order to provide transparency with the old world, the ViewFs file system (i.e
 
 ViewFs implements the Hadoop file system interface just like HDFS and the local file system. It is a trivial file system in the sense that it only allows linking to other file systems. Because ViewFs implements the Hadoop file system interface, it works transparently Hadoop tools. For example, all the shell commands work with ViewFs as with HDFS and local file system.
 
-The mount points of a mount table are specified in the standard Hadoop configuration files. In the configuration of each cluster, the default file system is set to the mount table for that cluster as shown below (compare it with the configuration in [Single Namenode Clusters](#Single_Namenode_Clusters)).
+In the configuration of each cluster, the default file system is set to the mount table for that cluster as shown below (compare it with the configuration in [Single Namenode Clusters](#Single_Namenode_Clusters)).
 
 ```xml
 <property>
@@ -100,7 +100,47 @@ The mount points of a mount table are specified in the standard Hadoop configura
 </property>
 ```
 
-The authority following the `viewfs://` scheme in the URI is the mount table name. It is recommanded that the mount table of a cluster should be named by the cluster name. Then Hadoop system will look for a mount table with the name "clusterX" in the Hadoop configuration files. Operations arrange all gateways and service machines to contain the mount tables for ALL clusters such that, for each cluster, the default file system is set to the ViewFs mount table for that cluster as described above.
+The authority following the `viewfs://` scheme in the URI is the mount table name. It is recommended that the mount table of a cluster should be named by the cluster name. Then Hadoop system will look for a mount table with the name "clusterX" in the Hadoop configuration files. Operations arrange all gateways and service machines to contain the mount tables for ALL clusters such that, for each cluster, the default file system is set to the ViewFs mount table for that cluster as described above.
+
+The mount points of a mount table are specified in the standard Hadoop configuration files. All the mount table config entries for `viewfs` are prefixed by `fs.viewfs.mounttable.`. The mount points that are linking other filesystems are specified using `link` tags. The recommendation is to have mount points name same as in the linked filesystem target locations. For all namespaces that are not configured in the mount table, we can have them fallback to a default filesystem via `linkFallback`.
+
+In the below mount table configuration, namespace `/data` is linked to the filesystem `hdfs://nn1-clusterx.example.com:9820/data`, `/project` is linked to the filesystem `hdfs://nn2-clusterx.example.com:9820/project`. All namespaces that are not configured in the mount table, like `/logs` are linked to the filesystem `hdfs://nn5-clusterx.example.com:9820/home`.
+
+```xml
+<configuration>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.link./data</name>
+    <value>hdfs://nn1-clusterx.example.com:9820/data</value>
+  </property>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.link./project</name>
+    <value>hdfs://nn2-clusterx.example.com:9820/project</value>
+  </property>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.link./user</name>
+    <value>hdfs://nn3-clusterx.example.com:9820/user</value>
+  </property>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.link./tmp</name>
+    <value>hdfs://nn4-clusterx.example.com:9820/tmp</value>
+  </property>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterX.linkFallback</name>
+    <value>hdfs://nn5-clusterx.example.com:9820/home</value>
+  </property>
+</configuration>
+```
+
+Alternatively we can have the mount table's root merged with the root of another filesystem via `linkMergeSlash`. In the below mount table configuration, ClusterY's root is merged with the root filesystem at `hdfs://nn1-clustery.example.com:9820`.
+
+```xml
+<configuration>
+  <property>
+    <name>fs.viewfs.mounttable.ClusterY.linkMergeSlash</name>
+    <value>hdfs://nn1-clustery.example.com:9820/</value>
+  </property>
+</configuration>
+```
 
 ### Pathname Usage Patterns
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java
new file mode 100644
index 0000000..5fb7c3b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for viewfs with LinkFallback mount table entries.
+ */
+public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final String LINK_FALLBACK_CLUSTER_1_NAME = "Cluster1";
+  private static final FileSystem[] FS_HDFS = new FileSystem[NAME_SPACES_COUNT];
+  private static final Configuration CONF = new Configuration();
+  private static final File TEST_DIR = GenericTestUtils.getTestDir(
+      TestViewFileSystemLinkFallback.class.getSimpleName());
+  private static final String TEST_BASE_PATH =
+      "/tmp/TestViewFileSystemLinkFallback";
+  private final static Logger LOG = LoggerFactory.getLogger(
+      TestViewFileSystemLinkFallback.class);
+
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_BASE_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException,
+      LoginException, URISyntaxException {
+    SupportsBlocks = true;
+    CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(CONF)
+        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
+            NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT)
+        .build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under the
+   * root of the FS, and so that we don't try to delete the test dir, but rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+    ConfigUtil.addLinkFallback(conf, LINK_FALLBACK_CLUSTER_1_NAME,
+        targetTestRoot.toUri());
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  @Test
+  public void testConfLinkFallback() throws Exception {
+    Path testBasePath = new Path(TEST_BASE_PATH);
+    Path testLevel2Dir = new Path(TEST_BASE_PATH, "dir1/dirA");
+    Path testBaseFile = new Path(testBasePath, "testBaseFile.log");
+    Path testBaseFileRelative = new Path(testLevel2Dir,
+        "../../testBaseFile.log");
+    Path testLevel2File = new Path(testLevel2Dir, "testLevel2File.log");
+    fsTarget.mkdirs(testLevel2Dir);
+
+    fsTarget.createNewFile(testBaseFile);
+    FSDataOutputStream dataOutputStream = fsTarget.append(testBaseFile);
+    dataOutputStream.write(1);
+    dataOutputStream.close();
+
+    fsTarget.createNewFile(testLevel2File);
+    dataOutputStream = fsTarget.append(testLevel2File);
+    dataOutputStream.write("test link fallback".toString().getBytes());
+    dataOutputStream.close();
+
+    String clusterName = "ClusterFallback";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+
+    Configuration conf = new Configuration();
+    ConfigUtil.addLinkFallback(conf, clusterName, fsTarget.getUri());
+
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    assertEquals(ViewFileSystem.class, vfs.getClass());
+    FileStatus baseFileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+        + testBaseFile.toUri().toString()));
+    LOG.info("BaseFileStat: " + baseFileStat);
+    FileStatus baseFileRelStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+        + testBaseFileRelative.toUri().toString()));
+    LOG.info("BaseFileRelStat: " + baseFileRelStat);
+    Assert.assertEquals("Unexpected file length for " + testBaseFile,
+        1, baseFileStat.getLen());
+    Assert.assertEquals("Unexpected file length for " + testBaseFileRelative,
+        baseFileStat.getLen(), baseFileRelStat.getLen());
+    FileStatus level2FileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+        + testLevel2File.toUri().toString()));
+    LOG.info("Level2FileStat: " + level2FileStat);
+    vfs.close();
+  }
+
+  @Test
+  public void testConfLinkFallbackWithRegularLinks() throws Exception {
+    Path testBasePath = new Path(TEST_BASE_PATH);
+    Path testLevel2Dir = new Path(TEST_BASE_PATH, "dir1/dirA");
+    Path testBaseFile = new Path(testBasePath, "testBaseFile.log");
+    Path testLevel2File = new Path(testLevel2Dir, "testLevel2File.log");
+    fsTarget.mkdirs(testLevel2Dir);
+
+    fsTarget.createNewFile(testBaseFile);
+    fsTarget.createNewFile(testLevel2File);
+    FSDataOutputStream dataOutputStream = fsTarget.append(testLevel2File);
+    dataOutputStream.write("test link fallback".toString().getBytes());
+    dataOutputStream.close();
+
+    String clusterName = "ClusterFallback";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+
+    Configuration conf = new Configuration();
+    ConfigUtil.addLink(conf, clusterName,
+        "/internalDir/linkToDir2",
+        new Path(targetTestRoot, "dir2").toUri());
+    ConfigUtil.addLink(conf, clusterName,
+        "/internalDir/internalDirB/linkToDir3",
+        new Path(targetTestRoot, "dir3").toUri());
+    ConfigUtil.addLink(conf, clusterName,
+        "/danglingLink",
+        new Path(targetTestRoot, "missingTarget").toUri());
+    ConfigUtil.addLink(conf, clusterName,
+        "/linkToAFile",
+        new Path(targetTestRoot, "aFile").toUri());
+    System.out.println("ViewFs link fallback " + fsTarget.getUri());
+    ConfigUtil.addLinkFallback(conf, clusterName, targetTestRoot.toUri());
+
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    assertEquals(ViewFileSystem.class, vfs.getClass());
+    FileStatus baseFileStat = vfs.getFileStatus(
+        new Path(viewFsUri.toString() + testBaseFile.toUri().toString()));
+    LOG.info("BaseFileStat: " + baseFileStat);
+    Assert.assertEquals("Unexpected file length for " + testBaseFile,
+        0, baseFileStat.getLen());
+    FileStatus level2FileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+        + testLevel2File.toUri().toString()));
+    LOG.info("Level2FileStat: " + level2FileStat);
+
+    dataOutputStream = vfs.append(testLevel2File);
+    dataOutputStream.write("Writing via viewfs fallback path".getBytes());
+    dataOutputStream.close();
+
+    FileStatus level2FileStatAfterWrite = vfs.getFileStatus(
+        new Path(viewFsUri.toString() + testLevel2File.toUri().toString()));
+    Assert.assertTrue("Unexpected file length for " + testLevel2File,
+        level2FileStatAfterWrite.getLen() > level2FileStat.getLen());
+
+    vfs.close();
+  }
+
+  @Test
+  public void testConfLinkFallbackWithMountPoint() throws Exception {
+    TEST_DIR.mkdirs();
+    Configuration conf = new Configuration();
+    String clusterName = "ClusterX";
+    String mountPoint = "/user";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+    String expectedErrorMsg =  "Invalid linkFallback entry in config: " +
+        "linkFallback./user";
+    String mountTableEntry = Constants.CONFIG_VIEWFS_PREFIX + "."
+        + clusterName + "." + Constants.CONFIG_VIEWFS_LINK_FALLBACK
+        + "." + mountPoint;
+    conf.set(mountTableEntry, TEST_DIR.toURI().toString());
+
+    try {
+      FileSystem.get(viewFsUri, conf);
+      fail("Shouldn't allow linkMergeSlash to take extra mount points!");
+    } catch (IOException e) {
+      assertTrue("Unexpected error: " + e.getMessage(),
+          e.getMessage().contains(expectedErrorMsg));
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkMergeSlash.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkMergeSlash.java
new file mode 100644
index 0000000..606743f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkMergeSlash.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import javax.security.auth.login.LoginException;
+
+/**
+ * Test for viewfs with LinkMergeSlash mount table entries.
+ */
+public class TestViewFileSystemLinkMergeSlash extends ViewFileSystemBaseTest {
+
+  private static FileSystem fsDefault;
+  private static MiniDFSCluster cluster;
+  private static final int NAME_SPACES_COUNT = 3;
+  private static final int DATA_NODES_COUNT = 3;
+  private static final int FS_INDEX_DEFAULT = 0;
+  private static final String LINK_MERGE_SLASH_CLUSTER_1_NAME = "ClusterLMS1";
+  private static final String LINK_MERGE_SLASH_CLUSTER_2_NAME = "ClusterLMS2";
+  private static final FileSystem[] FS_HDFS = new FileSystem[NAME_SPACES_COUNT];
+  private static final Configuration CONF = new Configuration();
+  private static final File TEST_DIR = GenericTestUtils.getTestDir(
+      TestViewFileSystemLinkMergeSlash.class.getSimpleName());
+  private static final String TEST_TEMP_PATH =
+      "/tmp/TestViewFileSystemLinkMergeSlash";
+  private final static Logger LOG = LoggerFactory.getLogger(
+      TestViewFileSystemLinkMergeSlash.class);
+
+  @Override
+  protected FileSystemTestHelper createFileSystemHelper() {
+    return new FileSystemTestHelper(TEST_TEMP_PATH);
+  }
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning() throws IOException,
+      LoginException, URISyntaxException {
+    SupportsBlocks = true;
+    CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(CONF)
+        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
+            NAME_SPACES_COUNT))
+        .numDataNodes(DATA_NODES_COUNT)
+        .build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < NAME_SPACES_COUNT; i++) {
+      FS_HDFS[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    super.setUp();
+  }
+
+  /**
+   * Override this so that we don't set the targetTestRoot to any path under the
+   * root of the FS, and so that we don't try to delete the test dir, but rather
+   * only its contents.
+   */
+  @Override
+  void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  @Override
+  void setupMountPoints() {
+    super.setupMountPoints();
+    ConfigUtil.addLinkMergeSlash(conf, LINK_MERGE_SLASH_CLUSTER_1_NAME,
+        targetTestRoot.toUri());
+    ConfigUtil.addLinkMergeSlash(conf, LINK_MERGE_SLASH_CLUSTER_2_NAME,
+        targetTestRoot.toUri());
+  }
+
+  @Override
+  int getExpectedDelegationTokenCount() {
+    return 1; // all point to the same fs so 1 unique token
+  }
+
+  @Override
+  int getExpectedDelegationTokenCountWithCredentials() {
+    return 1;
+  }
+
+  @Test
+  public void testConfLinkMergeSlash() throws Exception {
+    TEST_DIR.mkdirs();
+    String clusterName = "ClusterMerge";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+    String testFileName = "testLinkMergeSlash";
+
+    File infile = new File(TEST_DIR, testFileName);
+    final byte[] content = "HelloWorld".getBytes();
+    FileOutputStream fos = null;
+    try {
+      fos = new FileOutputStream(infile);
+      fos.write(content);
+    } finally {
+      if (fos != null) {
+        fos.close();
+      }
+    }
+    assertEquals((long)content.length, infile.length());
+
+    Configuration conf = new Configuration();
+    ConfigUtil.addLinkMergeSlash(conf, clusterName, TEST_DIR.toURI());
+
+    FileSystem vfs = FileSystem.get(viewFsUri, conf);
+    assertEquals(ViewFileSystem.class, vfs.getClass());
+    FileStatus stat = vfs.getFileStatus(new Path(viewFsUri.toString() +
+        testFileName));
+
+    LOG.info("File stat: " + stat);
+    vfs.close();
+  }
+
+  @Test
+  public void testConfLinkMergeSlashWithRegularLinks() throws Exception {
+    TEST_DIR.mkdirs();
+    String clusterName = "ClusterMerge";
+    String expectedErrorMsg1 = "Mount table ClusterMerge has already been " +
+        "configured with a merge slash link";
+    String expectedErrorMsg2 = "Mount table ClusterMerge has already been " +
+        "configured with regular links";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+    Configuration conf = new Configuration();
+    ConfigUtil.addLinkMergeSlash(conf, clusterName, TEST_DIR.toURI());
+    ConfigUtil.addLink(conf, clusterName, "testDir", TEST_DIR.toURI());
+
+    try {
+      FileSystem.get(viewFsUri, conf);
+      fail("Shouldn't allow both merge slash link and regular link on same "
+          + "mount table.");
+    } catch (IOException e) {
+      assertTrue("Unexpected error message: " + e.getMessage(),
+          e.getMessage().contains(expectedErrorMsg1) || e.getMessage()
+              .contains(expectedErrorMsg2));
+    }
+  }
+
+  @Test
+  public void testConfLinkMergeSlashWithMountPoint() throws Exception {
+    TEST_DIR.mkdirs();
+    Configuration conf = new Configuration();
+    String clusterName = "ClusterX";
+    String mountPoint = "/user";
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
+        "/", null, null);
+    String expectedErrorMsg =  "Invalid linkMergeSlash entry in config: " +
+        "linkMergeSlash./user";
+    String mountTableEntry = Constants.CONFIG_VIEWFS_PREFIX + "."
+        + clusterName + "." + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH
+        + "." + mountPoint;
+    conf.set(mountTableEntry, TEST_DIR.toURI().toString());
+
+    try {
+      FileSystem.get(viewFsUri, conf);
+      fail("Shouldn't allow linkMergeSlash to take extra mount points!");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains(expectedErrorMsg));
+    }
+  }
+
+  @Test
+  public void testChildFileSystems() throws Exception {
+    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME,
+        LINK_MERGE_SLASH_CLUSTER_1_NAME, "/", null, null);
+    FileSystem fs = FileSystem.get(viewFsUri, conf);
+    FileSystem[] childFs = fs.getChildFileSystems();
+    Assert.assertEquals("Unexpected number of child filesystems!",
+        1, childFs.length);
+    Assert.assertEquals("Unexpected child filesystem!",
+        DistributedFileSystem.class, childFs[0].getClass());
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/03: HADOOP-13722. Code cleanup -- ViewFileSystem and InodeTree. Contributed by Manoj Govindassamy.

Posted by om...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit e831f7b22895608125c9a9b14723112686a989aa
Author: Andrew Wang <wa...@apache.org>
AuthorDate: Fri Feb 18 18:34:11 2022 -0800

    HADOOP-13722. Code cleanup -- ViewFileSystem and InodeTree. Contributed by Manoj Govindassamy.
    
    (cherry picked from commit 0f4afc81009129bbee89d5b6cf22c8dda612d223)
---
 .../org/apache/hadoop/fs/viewfs/InodeTree.java     | 198 ++++++++++-----------
 .../apache/hadoop/fs/viewfs/ViewFileSystem.java    |  85 +++++----
 .../apache/hadoop/fs/viewfs/TestViewFsConfig.java  |  35 ++--
 3 files changed, 146 insertions(+), 172 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
index 779cec8..c9bdf63 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -37,47 +37,45 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
-
 /**
  * InodeTree implements a mount-table as a tree of inodes.
  * It is used to implement ViewFs and ViewFileSystem.
  * In order to use it the caller must subclass it and implement
  * the abstract methods {@link #getTargetFileSystem(INodeDir)}, etc.
- * 
+ *
  * The mountable is initialized from the config variables as 
  * specified in {@link ViewFs}
  *
  * @param <T> is AbstractFileSystem or FileSystem
- * 
- * The three main methods are
- * {@link #InodeTreel(Configuration)} // constructor
+ *
+ * The two main methods are
  * {@link #InodeTree(Configuration, String)} // constructor
  * {@link #resolve(String, boolean)} 
  */
 
 @InterfaceAudience.Private
-@InterfaceStability.Unstable 
+@InterfaceStability.Unstable
 abstract class InodeTree<T> {
-  static enum ResultKind {isInternalDir, isExternalDir;};
+  enum ResultKind {
+    INTERNAL_DIR,
+    EXTERNAL_DIR
+  }
+
   static final Path SlashPath = new Path("/");
-  
-  final INodeDir<T> root; // the root of the mount table
-  
-  final String homedirPrefix; // the homedir config value for this mount table
-  
-  List<MountPoint<T>> mountPoints = new ArrayList<MountPoint<T>>();
-  
-  
+  private final INodeDir<T> root;     // the root of the mount table
+  private final String homedirPrefix; // the homedir for this mount table
+  private List<MountPoint<T>> mountPoints = new ArrayList<MountPoint<T>>();
+
   static class MountPoint<T> {
     String src;
     INodeLink<T> target;
+
     MountPoint(String srcPath, INodeLink<T> mountLink) {
       src = srcPath;
       target = mountLink;
     }
-
   }
-  
+
   /**
    * Breaks file path into component names.
    * @param path
@@ -85,18 +83,19 @@ abstract class InodeTree<T> {
    */
   static String[] breakIntoPathComponents(final String path) {
     return path == null ? null : path.split(Path.SEPARATOR);
-  } 
-  
+  }
+
   /**
    * Internal class for inode tree
    * @param <T>
    */
   abstract static class INode<T> {
     final String fullPath; // the full path to the root
+
     public INode(String pathToNode, UserGroupInformation aUgi) {
       fullPath = pathToNode;
     }
-  };
+  }
 
   /**
    * Internal class to represent an internal dir of the mount table
@@ -106,37 +105,28 @@ abstract class InodeTree<T> {
     final Map<String,INode<T>> children = new HashMap<String,INode<T>>();
     T InodeDirFs =  null; // file system of this internal directory of mountT
     boolean isRoot = false;
-    
+
     INodeDir(final String pathToNode, final UserGroupInformation aUgi) {
       super(pathToNode, aUgi);
     }
 
-    INode<T> resolve(final String pathComponent) throws FileNotFoundException {
-      final INode<T> result = resolveInternal(pathComponent);
-      if (result == null) {
-        throw new FileNotFoundException();
-      }
-      return result;
-    }
-    
     INode<T> resolveInternal(final String pathComponent) {
       return children.get(pathComponent);
     }
-    
+
     INodeDir<T> addDir(final String pathComponent,
-        final UserGroupInformation aUgi)
-      throws FileAlreadyExistsException {
+        final UserGroupInformation aUgi) throws FileAlreadyExistsException {
       if (children.containsKey(pathComponent)) {
         throw new FileAlreadyExistsException();
       }
-      final INodeDir<T> newDir = new INodeDir<T>(fullPath+ (isRoot ? "" : "/") + 
-          pathComponent, aUgi);
+      final INodeDir<T> newDir = new INodeDir<T>(fullPath +
+          (isRoot ? "" : "/") + pathComponent, aUgi);
       children.put(pathComponent, newDir);
       return newDir;
     }
-    
+
     void addLink(final String pathComponent, final INodeLink<T> link)
-      throws FileAlreadyExistsException {
+        throws FileAlreadyExistsException {
       if (children.containsKey(pathComponent)) {
         throw new FileAlreadyExistsException();
       }
@@ -145,14 +135,14 @@ abstract class InodeTree<T> {
   }
 
   /**
-   * In internal class to represent a mount link
+   * An internal class to represent a mount link.
    * A mount link can be single dir link or a merge dir link.
 
    * A merge dir link is  a merge (junction) of links to dirs:
-   * example : <merge of 2 dirs
+   * example : merge of 2 dirs
    *     /users -> hdfs:nn1//users
    *     /users -> hdfs:nn2//users
-   * 
+   *
    * For a merge, each target is checked to be dir when created but if target
    * is changed later it is then ignored (a dir with null entries)
    */
@@ -163,8 +153,9 @@ abstract class InodeTree<T> {
     // Function to initialize file system. Only applicable for simple links
     private Function<URI, T> fileSystemInitMethod;
     private final Object lock = new Object();
+
     /**
-     * Construct a mergeLink
+     * Construct a mergeLink.
      */
     INodeLink(final String pathToNode, final UserGroupInformation aUgi,
         final T targetMergeFs, final URI[] aTargetDirLinkList) {
@@ -173,9 +164,9 @@ abstract class InodeTree<T> {
       targetDirLinkList = aTargetDirLinkList;
       isMergeLink = true;
     }
-    
+
     /**
-     * Construct a simple link (i.e. not a mergeLink)
+     * Construct a simple link (i.e. not a mergeLink).
      */
     INodeLink(final String pathToNode, final UserGroupInformation aUgi,
         Function<URI, T> createFileSystemMethod,
@@ -187,16 +178,15 @@ abstract class InodeTree<T> {
       isMergeLink = false;
       this.fileSystemInitMethod = createFileSystemMethod;
     }
-    
+
     /**
-     * Get the target of the link
-     * If a merge link then it returned as "," separated URI list.
+     * Get the target of the link. If a merge link then it returned
+     * as "," separated URI list.
      */
     Path getTargetLink() {
-      // is merge link - use "," as separator between the merged URIs
-      //String result = targetDirLinkList[0].toString();
       StringBuilder result = new StringBuilder(targetDirLinkList[0].toString());
-      for (int i=1; i < targetDirLinkList.length; ++i) { 
+      // If merge link, use "," as separator between the merged URIs
+      for (int i = 1; i < targetDirLinkList.length; ++i) {
         result.append(',').append(targetDirLinkList[i].toString());
       }
       return new Path(result.toString());
@@ -230,22 +220,21 @@ abstract class InodeTree<T> {
     }
   }
 
-
   private void createLink(final String src, final String target,
       final boolean isLinkMerge, final UserGroupInformation aUgi)
       throws URISyntaxException, IOException,
-    FileAlreadyExistsException, UnsupportedFileSystemException {
+      FileAlreadyExistsException, UnsupportedFileSystemException {
     // Validate that src is valid absolute path
-    final Path srcPath = new Path(src); 
+    final Path srcPath = new Path(src);
     if (!srcPath.isAbsoluteAndSchemeAuthorityNull()) {
-      throw new IOException("ViewFs:Non absolute mount name in config:" + src);
+      throw new IOException("ViewFs: Non absolute mount name in config:" + src);
     }
- 
+
     final String[] srcPaths = breakIntoPathComponents(src);
     INodeDir<T> curInode = root;
     int i;
     // Ignore first initial slash, process all except last component
-    for (i = 1; i < srcPaths.length-1; i++) {
+    for (i = 1; i < srcPaths.length - 1; i++) {
       final String iPath = srcPaths[i];
       INode<T> nextInode = curInode.resolveInternal(iPath);
       if (nextInode == null) {
@@ -258,11 +247,11 @@ abstract class InodeTree<T> {
         throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
             " already exists as link");
       } else {
-        assert(nextInode instanceof INodeDir);
+        assert (nextInode instanceof INodeDir);
         curInode = (INodeDir<T>) nextInode;
       }
     }
-    
+
     // Now process the last component
     // Add the link in 2 cases: does not exist or a link exists
     String iPath = srcPaths[i];// last component
@@ -273,9 +262,9 @@ abstract class InodeTree<T> {
         strB.append('/').append(srcPaths[j]);
       }
       throw new FileAlreadyExistsException("Path " + strB +
-            " already exists as dir; cannot create link here");
+          " already exists as dir; cannot create link here");
     }
-    
+
     final INodeLink<T> newLink;
     final String fullPath = curInode.fullPath + (curInode == root ? "" : "/")
         + iPath;
@@ -295,24 +284,20 @@ abstract class InodeTree<T> {
     curInode.addLink(iPath, newLink);
     mountPoints.add(new MountPoint<T>(src, newLink));
   }
-  
-  /**
-   * Below the "public" methods of InodeTree
-   */
 
   /**
    * The user of this class must subclass and implement the following
    * 3 abstract methods.
-   * @throws IOException 
+   * @throws IOException
    */
   protected abstract Function<URI, T> initAndGetTargetFs();
-  
+
   protected abstract T getTargetFileSystem(final INodeDir<T> dir)
-    throws URISyntaxException;
-  
+      throws URISyntaxException;
+
   protected abstract T getTargetFileSystem(final URI[] mergeFsURIList)
-  throws UnsupportedFileSystemException, URISyntaxException;
-  
+      throws UnsupportedFileSystemException, URISyntaxException;
+
   /**
    * Create Inode Tree from the specified mount-table specified in Config
    * @param config - the mount table keys are prefixed with 
@@ -325,7 +310,7 @@ abstract class InodeTree<T> {
    */
   protected InodeTree(final Configuration config, final String viewName)
       throws UnsupportedFileSystemException, URISyntaxException,
-    FileAlreadyExistsException, IOException { 
+      FileAlreadyExistsException, IOException {
     String vName = viewName;
     if (vName == null) {
       vName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
@@ -334,9 +319,9 @@ abstract class InodeTree<T> {
     root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
     root.InodeDirFs = getTargetFileSystem(root);
     root.isRoot = true;
-    
-    final String mtPrefix = Constants.CONFIG_VIEWFS_PREFIX + "." + 
-                            vName + ".";
+
+    final String mtPrefix = Constants.CONFIG_VIEWFS_PREFIX + "." +
+        vName + ".";
     final String linkPrefix = Constants.CONFIG_VIEWFS_LINK + ".";
     final String linkMergePrefix = Constants.CONFIG_VIEWFS_LINK_MERGE + ".";
     boolean gotMountTableEntry = false;
@@ -362,18 +347,17 @@ abstract class InodeTree<T> {
           // ignore - we set home dir from config
           continue;
         } else {
-          throw new IOException(
-          "ViewFs: Cannot initialize: Invalid entry in Mount table in config: "+ 
-          src);
+          throw new IOException("ViewFs: Cannot initialize: Invalid entry in " +
+              "Mount table in config: " + src);
         }
         final String target = si.getValue(); // link or merge link
-        createLink(src, target, isMergeLink, ugi); 
+        createLink(src, target, isMergeLink, ugi);
       }
     }
     if (!gotMountTableEntry) {
       throw new IOException(
           "ViewFs: Cannot initialize: Empty Mount table in config for " +
-             "viewfs://" + vName + "/");
+              "viewfs://" + vName + "/");
     }
   }
 
@@ -381,7 +365,7 @@ abstract class InodeTree<T> {
    * Resolve returns ResolveResult.
    * The caller can continue the resolution of the remainingPath
    * in the targetFileSystem.
-   * 
+   *
    * If the input pathname leads to link to another file system then
    * the targetFileSystem is the one denoted by the link (except it is
    * file system chrooted to link target.
@@ -393,7 +377,7 @@ abstract class InodeTree<T> {
     final T targetFileSystem;
     final String resolvedPath;
     final Path remainingPath;   // to resolve in the target FileSystem
-    
+
     ResolveResult(final ResultKind k, final T targetFs, final String resolveP,
         final Path remainingP) {
       kind = k;
@@ -401,31 +385,31 @@ abstract class InodeTree<T> {
       resolvedPath = resolveP;
       remainingPath = remainingP;
     }
-    
-    // isInternalDir of path resolution completed within the mount table 
+
+    // Internal dir path resolution completed within the mount table
     boolean isInternalDir() {
-      return (kind == ResultKind.isInternalDir);
+      return (kind == ResultKind.INTERNAL_DIR);
     }
   }
-  
+
   /**
    * Resolve the pathname p relative to root InodeDir
    * @param p - inout path
-   * @param resolveLastComponent 
+   * @param resolveLastComponent
    * @return ResolveResult which allows further resolution of the remaining path
    * @throws FileNotFoundException
    */
   ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
-    throws IOException {
+      throws IOException {
     // TO DO: - more efficient to not split the path, but simply compare
     String[] path = breakIntoPathComponents(p); 
     if (path.length <= 1) { // special case for when path is "/"
-      ResolveResult<T> res = 
-        new ResolveResult<T>(ResultKind.isInternalDir, 
+      ResolveResult<T> res =
+          new ResolveResult<T>(ResultKind.INTERNAL_DIR,
               root.InodeDirFs, root.fullPath, SlashPath);
       return res;
     }
-    
+
     INodeDir<T> curInode = root;
     int i;
     // ignore first slash
@@ -433,27 +417,27 @@ abstract class InodeTree<T> {
       INode<T> nextInode = curInode.resolveInternal(path[i]);
       if (nextInode == null) {
         StringBuilder failedAt = new StringBuilder(path[0]);
-        for ( int j = 1; j <=i; ++j) {
+        for (int j = 1; j <= i; ++j) {
           failedAt.append('/').append(path[j]);
         }
-        throw (new FileNotFoundException(failedAt.toString()));      
+        throw (new FileNotFoundException(failedAt.toString()));
       }
 
       if (nextInode instanceof INodeLink) {
         final INodeLink<T> link = (INodeLink<T>) nextInode;
         final Path remainingPath;
-        if (i >= path.length-1) {
+        if (i >= path.length - 1) {
           remainingPath = SlashPath;
         } else {
-          StringBuilder remainingPathStr = new StringBuilder("/" + path[i+1]);
-          for (int j = i+2; j< path.length; ++j) {
+          StringBuilder remainingPathStr = new StringBuilder("/" + path[i + 1]);
+          for (int j = i + 2; j < path.length; ++j) {
             remainingPathStr.append('/').append(path[j]);
           }
           remainingPath = new Path(remainingPathStr.toString());
         }
-        final ResolveResult<T> res = 
-          new ResolveResult<T>(ResultKind.isExternalDir,
-              link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
+        final ResolveResult<T> res =
+            new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
+                link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
         return res;
       } else if (nextInode instanceof INodeDir) {
         curInode = (INodeDir<T>) nextInode;
@@ -470,23 +454,23 @@ abstract class InodeTree<T> {
       // that follows will do a children.get(remaningPath) and will have to
       // strip-out the initial /
       StringBuilder remainingPathStr = new StringBuilder("/" + path[i]);
-      for (int j = i+1; j< path.length; ++j) {
+      for (int j = i + 1; j < path.length; ++j) {
         remainingPathStr.append('/').append(path[j]);
       }
       remainingPath = new Path(remainingPathStr.toString());
     }
-    final ResolveResult<T> res = 
-       new ResolveResult<T>(ResultKind.isInternalDir,
-           curInode.InodeDirFs, curInode.fullPath, remainingPath); 
+    final ResolveResult<T> res =
+        new ResolveResult<T>(ResultKind.INTERNAL_DIR,
+            curInode.InodeDirFs, curInode.fullPath, remainingPath);
     return res;
   }
-  
-  List<MountPoint<T>> getMountPoints() { 
+
+  List<MountPoint<T>> getMountPoints() {
     return mountPoints;
   }
-  
+
   /**
-   * 
+   *
    * @return home dir value from mount table; null if no config value
    * was found.
    */
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
index a1b9c9e..0a3b65d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
@@ -164,8 +164,17 @@ public class ViewFileSystem extends FileSystem {
   }
 
   static public class MountPoint {
-    private Path src;       // the src of the mount
-    private URI[] targets; //  target of the mount; Multiple targets imply mergeMount
+    /**
+     *  The source of the mount.
+     */
+    private Path src;
+
+    /**
+     * One or more targets of the mount.
+     * Multiple targets imply MergeMount.
+     */
+    private URI[] targets;
+
     MountPoint(Path srcPath, URI[] targetURIs) {
       src = srcPath;
       targets = targetURIs;
@@ -220,19 +229,18 @@ public class ViewFileSystem extends FileSystem {
 
   /**
    * Return the protocol scheme for the FileSystem.
-   * <p/>
    *
    * @return <code>viewfs</code>
    */
   @Override
   public String getScheme() {
-    return "viewfs";
+    return FsConstants.VIEWFS_SCHEME;
   }
 
   /**
    * Called after a new FileSystem instance is constructed.
    * @param theUri a uri whose authority section names the host, port, etc. for
-   *          this FileSystem
+   *        this FileSystem
    * @param conf the configuration
    */
   @Override
@@ -304,8 +312,7 @@ public class ViewFileSystem extends FileSystem {
     }
 
   }
-  
-  
+
   /**
    * Convenience Constructor for apps to call directly
    * @param theUri which must be that of ViewFileSystem
@@ -313,7 +320,7 @@ public class ViewFileSystem extends FileSystem {
    * @throws IOException
    */
   ViewFileSystem(final URI theUri, final Configuration conf)
-    throws IOException {
+      throws IOException {
     this();
     initialize(theUri, conf);
   }
@@ -339,8 +346,7 @@ public class ViewFileSystem extends FileSystem {
   }
   
   @Override
-  public Path resolvePath(final Path f)
-      throws IOException {
+  public Path resolvePath(final Path f) throws IOException {
     final InodeTree.ResolveResult<FileSystem> res;
       res = fsState.resolve(getUriPath(f), true);
     if (res.isInternalDir()) {
@@ -384,8 +390,8 @@ public class ViewFileSystem extends FileSystem {
   
   @Override
   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
-      EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
-      Progressable progress) throws IOException {
+      EnumSet<CreateFlag> flags, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
     InodeTree.ResolveResult<FileSystem> res;
     try {
       res = fsState.resolve(getUriPath(f), false);
@@ -393,8 +399,8 @@ public class ViewFileSystem extends FileSystem {
         throw readOnlyMountTable("create", f);
     }
     assert(res.remainingPath != null);
-    return res.targetFileSystem.createNonRecursive(res.remainingPath, permission,
-        flags, bufferSize, replication, blockSize, progress);
+    return res.targetFileSystem.createNonRecursive(res.remainingPath,
+        permission, flags, bufferSize, replication, blockSize, progress);
   }
   
   @Override
@@ -415,10 +421,9 @@ public class ViewFileSystem extends FileSystem {
   
   @Override
   public boolean delete(final Path f, final boolean recursive)
-      throws AccessControlException, FileNotFoundException,
-      IOException {
+      throws AccessControlException, FileNotFoundException, IOException {
     InodeTree.ResolveResult<FileSystem> res = 
-      fsState.resolve(getUriPath(f), true);
+        fsState.resolve(getUriPath(f), true);
     // If internal dir or target is a mount link (ie remainingPath is Slash)
     if (res.isInternalDir() || res.remainingPath == InodeTree.SlashPath) {
       throw readOnlyMountTable("delete", f);
@@ -429,9 +434,8 @@ public class ViewFileSystem extends FileSystem {
   @Override
   @SuppressWarnings("deprecation")
   public boolean delete(final Path f)
-      throws AccessControlException, FileNotFoundException,
-      IOException {
-      return delete(f, true);
+      throws AccessControlException, FileNotFoundException, IOException {
+    return delete(f, true);
   }
   
   @Override
@@ -452,7 +456,6 @@ public class ViewFileSystem extends FileSystem {
     return res.targetFileSystem.getFileChecksum(res.remainingPath);
   }
 
-
   private static FileStatus fixFileStatus(FileStatus orig,
       Path qualified) throws IOException {
     // FileStatus#getPath is a fully qualified path relative to the root of
@@ -480,7 +483,6 @@ public class ViewFileSystem extends FileSystem {
         : new ViewFsFileStatus(orig, qualified);
   }
 
-
   @Override
   public FileStatus getFileStatus(final Path f) throws AccessControlException,
       FileNotFoundException, IOException {
@@ -520,10 +522,10 @@ public class ViewFileSystem extends FileSystem {
   @Override
   public RemoteIterator<LocatedFileStatus>listLocatedStatus(final Path f,
       final PathFilter filter) throws FileNotFoundException, IOException {
-    final InodeTree.ResolveResult<FileSystem> res = fsState
-        .resolve(getUriPath(f), true);
-    final RemoteIterator<LocatedFileStatus> statusIter = res.targetFileSystem
-        .listLocatedStatus(res.remainingPath);
+    final InodeTree.ResolveResult<FileSystem> res =
+        fsState.resolve(getUriPath(f), true);
+    final RemoteIterator<LocatedFileStatus> statusIter =
+        res.targetFileSystem.listLocatedStatus(res.remainingPath);
 
     if (res.isInternalDir()) {
       return statusIter;
@@ -569,8 +571,7 @@ public class ViewFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(final Path f, final int bufferSize)
-      throws AccessControlException, FileNotFoundException,
-      IOException {
+      throws AccessControlException, FileNotFoundException, IOException {
     InodeTree.ResolveResult<FileSystem> res = 
         fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.open(res.remainingPath, bufferSize);
@@ -655,8 +656,7 @@ public class ViewFileSystem extends FileSystem {
   @Override
   public void setOwner(final Path f, final String username,
       final String groupname) throws AccessControlException,
-      FileNotFoundException,
-      IOException {
+      FileNotFoundException, IOException {
     InodeTree.ResolveResult<FileSystem> res = 
       fsState.resolve(getUriPath(f), true);
     res.targetFileSystem.setOwner(res.remainingPath, username, groupname); 
@@ -664,8 +664,7 @@ public class ViewFileSystem extends FileSystem {
 
   @Override
   public void setPermission(final Path f, final FsPermission permission)
-      throws AccessControlException, FileNotFoundException,
-      IOException {
+      throws AccessControlException, FileNotFoundException, IOException {
     InodeTree.ResolveResult<FileSystem> res = 
       fsState.resolve(getUriPath(f), true);
     res.targetFileSystem.setPermission(res.remainingPath, permission); 
@@ -673,8 +672,7 @@ public class ViewFileSystem extends FileSystem {
 
   @Override
   public boolean setReplication(final Path f, final short replication)
-      throws AccessControlException, FileNotFoundException,
-      IOException {
+      throws AccessControlException, FileNotFoundException, IOException {
     InodeTree.ResolveResult<FileSystem> res = 
       fsState.resolve(getUriPath(f), true);
     return res.targetFileSystem.setReplication(res.remainingPath, replication);
@@ -682,8 +680,7 @@ public class ViewFileSystem extends FileSystem {
 
   @Override
   public void setTimes(final Path f, final long mtime, final long atime)
-      throws AccessControlException, FileNotFoundException,
-      IOException {
+      throws AccessControlException, FileNotFoundException, IOException {
     InodeTree.ResolveResult<FileSystem> res = 
       fsState.resolve(getUriPath(f), true);
     res.targetFileSystem.setTimes(res.remainingPath, mtime, atime); 
@@ -1084,8 +1081,8 @@ public class ViewFileSystem extends FileSystem {
 
     static private void checkPathIsSlash(final Path f) throws IOException {
       if (f != InodeTree.SlashPath) {
-        throw new IOException (
-        "Internal implementation error: expected file name to be /" );
+        throw new IOException(
+            "Internal implementation error: expected file name to be /");
       }
     }
     
@@ -1096,14 +1093,14 @@ public class ViewFileSystem extends FileSystem {
 
     @Override
     public Path getWorkingDirectory() {
-      throw new RuntimeException (
-      "Internal impl error: getWorkingDir should not have been called" );
+      throw new RuntimeException(
+          "Internal impl error: getWorkingDir should not have been called");
     }
 
     @Override
     public void setWorkingDirectory(final Path new_dir) {
-      throw new RuntimeException (
-      "Internal impl error: getWorkingDir should not have been called" ); 
+      throw new RuntimeException(
+          "Internal impl error: getWorkingDir should not have been called");
     }
 
     @Override
@@ -1136,7 +1133,7 @@ public class ViewFileSystem extends FileSystem {
 
     @Override
     public BlockLocation[] getFileBlockLocations(final FileStatus fs,
-        final long start, final long len) throws 
+        final long start, final long len) throws
         FileNotFoundException, IOException {
       checkPathIsSlash(fs.getPath());
       throw new FileNotFoundException("Path points to dir not a file");
@@ -1319,7 +1316,7 @@ public class ViewFileSystem extends FileSystem {
 
     @Override
     public void setXAttr(Path path, String name, byte[] value,
-                         EnumSet<XAttrSetFlag> flag) throws IOException {
+        EnumSet<XAttrSetFlag> flag) throws IOException {
       checkPathIsSlash(path);
       throw readOnlyMountTable("setXAttr", path);
     }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
index daf68c8..2c0a6e1 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,29 +23,25 @@ import java.net.URI;
 import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.fs.viewfs.ConfigUtil;
-import org.apache.hadoop.fs.viewfs.InodeTree;
 import org.junit.Test;
 
-
 public class TestViewFsConfig {
-  
-  
-  @Test(expected=FileAlreadyExistsException.class)
+
+  @Test(expected = FileAlreadyExistsException.class)
   public void testInvalidConfig() throws IOException, URISyntaxException {
     Configuration conf = new Configuration();
     ConfigUtil.addLink(conf, "/internalDir/linkToDir2",
         new Path("file:///dir2").toUri());
     ConfigUtil.addLink(conf, "/internalDir/linkToDir2/linkToDir3",
         new Path("file:///dir3").toUri());
-    
-    class Foo { };
-    
-     new InodeTree<Foo>(conf, null) {
+
+    class Foo {
+    }
+
+    new InodeTree<Foo>(conf, null) {
 
       @Override
       protected Function<URI, Foo> initAndGetTargetFs() {
@@ -53,17 +49,14 @@ public class TestViewFsConfig {
       }
 
       @Override
-      protected
-      Foo getTargetFileSystem(
-          org.apache.hadoop.fs.viewfs.InodeTree.INodeDir<Foo>
-                                          dir)
-        throws URISyntaxException {
+      protected Foo getTargetFileSystem(
+          org.apache.hadoop.fs.viewfs.InodeTree.INodeDir<Foo> dir)
+          throws URISyntaxException {
         return null;
       }
 
       @Override
-      protected
-      Foo getTargetFileSystem(URI[] mergeFsURIList)
+      protected Foo getTargetFileSystem(URI[] mergeFsURIList)
           throws URISyntaxException, UnsupportedFileSystemException {
         return null;
       }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/03: HADOOP-12077. Provide a multi-URI replication Inode for ViewFs. Contributed by Gera Shegalov

Posted by om...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 20ff29471480fc9c74e6cf78a0ad152a6ba0772f
Author: Chris Douglas <cd...@apache.org>
AuthorDate: Tue Sep 5 23:30:18 2017 -0700

    HADOOP-12077. Provide a multi-URI replication Inode for ViewFs. Contributed by Gera Shegalov
    
    (cherry picked from commit 1f3bc63e6772be81bc9a6a7d93ed81d2a9e066c0)
---
 .../org/apache/hadoop/fs/viewfs/ConfigUtil.java    |  27 +
 .../org/apache/hadoop/fs/viewfs/Constants.java     |   8 +-
 .../org/apache/hadoop/fs/viewfs/InodeTree.java     |  62 +-
 .../org/apache/hadoop/fs/viewfs/NflyFSystem.java   | 951 +++++++++++++++++++++
 .../apache/hadoop/fs/viewfs/ViewFileSystem.java    |  34 +-
 .../java/org/apache/hadoop/fs/viewfs/ViewFs.java   |   7 +-
 .../viewfs/TestViewFileSystemLocalFileSystem.java  |  77 +-
 .../apache/hadoop/fs/viewfs/TestViewFsConfig.java  |  10 +-
 .../hadoop/fs/viewfs/TestViewFileSystemHdfs.java   | 147 +++-
 9 files changed, 1270 insertions(+), 53 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
index bb941c7..8acd41f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.viewfs;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Utilities for config variables of the viewFs See {@link ViewFs}
@@ -69,6 +70,32 @@ public class ConfigUtil {
   }
   
   /**
+   *
+   * @param conf
+   * @param mountTableName
+   * @param src
+   * @param settings
+   * @param targets
+   */
+  public static void addLinkNfly(Configuration conf, String mountTableName,
+      String src, String settings, final URI ... targets) {
+
+    settings = settings == null
+        ? "minReplication=2,repairOnRead=true"
+        : settings;
+
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+            Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
+        StringUtils.uriToString(targets));
+  }
+
+  public static void addLinkNfly(final Configuration conf, final String src,
+      final URI ... targets) {
+    addLinkNfly(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, src, null,
+        targets);
+  }
+
+  /**
    * Add config variable for homedir for default mount table
    * @param conf - add to this conf
    * @param homedir - the home dir path starting with slash
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
index 0c0e8a3..3f9aae2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
@@ -57,7 +57,13 @@ public interface Constants {
    * Config variable for specifying a merge link
    */
   public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
-  
+
+  /**
+   * Config variable for specifying an nfly link. Nfly writes to multiple
+   * locations, and allows reads from the closest one.
+   */
+  String CONFIG_VIEWFS_LINK_NFLY = "linkNfly";
+
   /**
    * Config variable for specifying a merge of the root of the mount-table
    *  with the root of another file system. 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
index c9bdf63..199ccc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
@@ -134,6 +134,12 @@ abstract class InodeTree<T> {
     }
   }
 
+  enum LinkType {
+    SINGLE,
+    MERGE,
+    NFLY
+  }
+
   /**
    * An internal class to represent a mount link.
    * A mount link can be single dir link or a merge dir link.
@@ -147,7 +153,6 @@ abstract class InodeTree<T> {
    * is changed later it is then ignored (a dir with null entries)
    */
   static class INodeLink<T> extends INode<T> {
-    final boolean isMergeLink; // true if MergeLink
     final URI[] targetDirLinkList;
     private T targetFileSystem;   // file system object created from the link.
     // Function to initialize file system. Only applicable for simple links
@@ -155,14 +160,13 @@ abstract class InodeTree<T> {
     private final Object lock = new Object();
 
     /**
-     * Construct a mergeLink.
+     * Construct a mergeLink or nfly.
      */
     INodeLink(final String pathToNode, final UserGroupInformation aUgi,
         final T targetMergeFs, final URI[] aTargetDirLinkList) {
       super(pathToNode, aUgi);
       targetFileSystem = targetMergeFs;
       targetDirLinkList = aTargetDirLinkList;
-      isMergeLink = true;
     }
 
     /**
@@ -175,7 +179,6 @@ abstract class InodeTree<T> {
       targetFileSystem = null;
       targetDirLinkList = new URI[1];
       targetDirLinkList[0] = aTargetDirLink;
-      isMergeLink = false;
       this.fileSystemInitMethod = createFileSystemMethod;
     }
 
@@ -221,7 +224,9 @@ abstract class InodeTree<T> {
   }
 
   private void createLink(final String src, final String target,
-      final boolean isLinkMerge, final UserGroupInformation aUgi)
+      final LinkType linkType, final String settings,
+      final UserGroupInformation aUgi,
+      final Configuration config)
       throws URISyntaxException, IOException,
       FileAlreadyExistsException, UnsupportedFileSystemException {
     // Validate that src is valid absolute path
@@ -268,18 +273,20 @@ abstract class InodeTree<T> {
     final INodeLink<T> newLink;
     final String fullPath = curInode.fullPath + (curInode == root ? "" : "/")
         + iPath;
-    if (isLinkMerge) { // Target is list of URIs
-      String[] targetsList = StringUtils.getStrings(target);
-      URI[] targetsListURI = new URI[targetsList.length];
-      int k = 0;
-      for (String itarget : targetsList) {
-        targetsListURI[k++] = new URI(itarget);
-      }
-      newLink = new INodeLink<T>(fullPath, aUgi,
-          getTargetFileSystem(targetsListURI), targetsListURI);
-    } else {
+    switch (linkType) {
+    case SINGLE:
       newLink = new INodeLink<T>(fullPath, aUgi,
           initAndGetTargetFs(), new URI(target));
+      break;
+    case MERGE:
+    case NFLY:
+      final URI[] targetUris = StringUtils.stringToURI(
+          StringUtils.getStrings(target));
+      newLink = new INodeLink<T>(fullPath, aUgi,
+            getTargetFileSystem(settings, targetUris), targetUris);
+      break;
+    default:
+      throw new IllegalArgumentException(linkType + ": Infeasible linkType");
     }
     curInode.addLink(iPath, newLink);
     mountPoints.add(new MountPoint<T>(src, newLink));
@@ -292,11 +299,11 @@ abstract class InodeTree<T> {
    */
   protected abstract Function<URI, T> initAndGetTargetFs();
 
-  protected abstract T getTargetFileSystem(final INodeDir<T> dir)
+  protected abstract T getTargetFileSystem(INodeDir<T> dir)
       throws URISyntaxException;
 
-  protected abstract T getTargetFileSystem(final URI[] mergeFsURIList)
-      throws UnsupportedFileSystemException, URISyntaxException;
+  protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
+      throws UnsupportedFileSystemException, URISyntaxException, IOException;
 
   /**
    * Create Inode Tree from the specified mount-table specified in Config
@@ -330,8 +337,9 @@ abstract class InodeTree<T> {
       final String key = si.getKey();
       if (key.startsWith(mtPrefix)) {
         gotMountTableEntry = true;
-        boolean isMergeLink = false;
+        LinkType linkType = LinkType.SINGLE;
         String src = key.substring(mtPrefix.length());
+        String settings = null;
         if (src.startsWith(linkPrefix)) {
           src = src.substring(linkPrefix.length());
           if (src.equals(SlashPath.toString())) {
@@ -341,8 +349,20 @@ abstract class InodeTree<T> {
                 + "supported yet.");
           }
         } else if (src.startsWith(linkMergePrefix)) { // A merge link
-          isMergeLink = true;
+          linkType = LinkType.MERGE;
           src = src.substring(linkMergePrefix.length());
+        } else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
+          // prefix.settings.src
+          src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
+          // settings.src
+          settings = src.substring(0, src.indexOf('.'));
+          // settings
+
+          // settings.src
+          src = src.substring(settings.length() + 1);
+          // src
+
+          linkType = LinkType.NFLY;
         } else if (src.startsWith(Constants.CONFIG_VIEWFS_HOMEDIR)) {
           // ignore - we set home dir from config
           continue;
@@ -351,7 +371,7 @@ abstract class InodeTree<T> {
               "Mount table in config: " + src);
         }
         final String target = si.getValue(); // link or merge link
-        createLink(src, target, isMergeLink, ugi);
+        createLink(src, target, linkType, settings, ugi, config);
       }
     }
     if (!gotMountTableEntry) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
new file mode 100644
index 0000000..53966b8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
@@ -0,0 +1,951 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Nfly is a multi filesystem mount point.
+ */
+@Private
+final class NflyFSystem extends FileSystem {
+  private static final Log LOG = LogFactory.getLog(NflyFSystem.class);
+  private static final String NFLY_TMP_PREFIX = "_nfly_tmp_";
+
+  enum NflyKey {
+    // minimum replication, if local filesystem is included +1 is recommended
+    minReplication,
+
+    // forces to check all the replicas and fetch the one with the most recent
+    // time stamp
+    //
+    readMostRecent,
+
+    // create missing replica from far to near, including local?
+    repairOnRead
+  }
+
+  private static final int DEFAULT_MIN_REPLICATION = 2;
+  private static URI nflyURI = URI.create("nfly:///");
+
+  private final NflyNode[] nodes;
+  private final int minReplication;
+  private final EnumSet<NflyKey> nflyFlags;
+  private final Node myNode;
+  private final NetworkTopology topology;
+
+  /**
+   * URI's authority is used as an approximation of the distance from the
+   * client. It's sufficient for DC but not accurate because worker nodes can be
+   * closer.
+   */
+  private static class NflyNode extends NodeBase {
+    private final ChRootedFileSystem fs;
+    NflyNode(String hostName, String rackName, URI uri,
+        Configuration conf) throws IOException {
+      this(hostName, rackName, new ChRootedFileSystem(uri, conf));
+    }
+
+    NflyNode(String hostName, String rackName, ChRootedFileSystem fs) {
+      super(hostName, rackName);
+      this.fs = fs;
+    }
+
+    ChRootedFileSystem getFs() {
+      return fs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      // satisfy findbugs
+      return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+      // satisfy findbugs
+      return super.hashCode();
+    }
+
+  }
+
+  private static final class MRNflyNode
+      extends NflyNode implements Comparable<MRNflyNode> {
+
+    private FileStatus status;
+
+    private MRNflyNode(NflyNode n) {
+      super(n.getName(), n.getNetworkLocation(), n.fs);
+    }
+
+    private void updateFileStatus(Path f) throws IOException {
+      final FileStatus tmpStatus = getFs().getFileStatus(f);
+      status = tmpStatus == null
+          ? notFoundStatus(f)
+          : tmpStatus;
+    }
+
+    // TODO allow configurable error margin for FileSystems with different
+    // timestamp precisions
+    @Override
+    public int compareTo(MRNflyNode other) {
+      if (status == null) {
+        return other.status == null ? 0 : 1; // move non-null towards head
+      } else if (other.status == null) {
+        return -1; // move this towards head
+      } else {
+        final long mtime = status.getModificationTime();
+        final long their = other.status.getModificationTime();
+        return Long.compare(their, mtime); // move more recent towards head
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof MRNflyNode)) {
+        return false;
+      }
+      MRNflyNode other = (MRNflyNode) o;
+      return 0 == compareTo(other);
+    }
+
+    @Override
+    public int hashCode() {
+      // satisfy findbugs
+      return super.hashCode();
+    }
+
+    private FileStatus nflyStatus() throws IOException {
+      return new NflyStatus(getFs(), status);
+    }
+
+    private FileStatus cloneStatus() throws IOException {
+      return new FileStatus(status.getLen(),
+          status.isDirectory(),
+          status.getReplication(),
+          status.getBlockSize(),
+          status.getModificationTime(),
+          status.getAccessTime(),
+          null, null, null,
+          status.isSymlink() ? status.getSymlink() : null,
+          status.getPath());
+    }
+  }
+
+  private MRNflyNode[] workSet() {
+    final MRNflyNode[] res = new MRNflyNode[nodes.length];
+    for (int i = 0; i < res.length; i++) {
+      res[i] = new MRNflyNode(nodes[i]);
+    }
+    return res;
+  }
+
+
+  /**
+   * Utility to replace null with DEFAULT_RACK.
+   *
+   * @param rackString rack value, can be null
+   * @return non-null rack string
+   */
+  private static String getRack(String rackString) {
+    return rackString == null ? NetworkTopology.DEFAULT_RACK : rackString;
+  }
+
+  /**
+   * Creates a new Nfly instance.
+   *
+   * @param uris the list of uris in the mount point
+   * @param conf configuration object
+   * @param minReplication minimum copies to commit a write op
+   * @param nflyFlags modes such readMostRecent
+   * @throws IOException
+   */
+  private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
+      EnumSet<NflyKey> nflyFlags) throws IOException {
+    if (uris.length < minReplication) {
+      throw new IOException(minReplication + " < " + uris.length
+          + ": Minimum replication < #destinations");
+    }
+    setConf(conf);
+    final String localHostName = InetAddress.getLocalHost().getHostName();
+
+    // build a list for topology resolution
+    final List<String> hostStrings = new ArrayList<String>(uris.length + 1);
+    for (URI uri : uris) {
+      final String uriHost = uri.getHost();
+      // assume local file system or another closest filesystem if no authority
+      hostStrings.add(uriHost == null ? localHostName : uriHost);
+    }
+    // resolve the client node
+    hostStrings.add(localHostName);
+
+    final DNSToSwitchMapping tmpDns = ReflectionUtils.newInstance(conf.getClass(
+        CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
+
+    // this is an ArrayList
+    final List<String> rackStrings = tmpDns.resolve(hostStrings);
+    nodes = new NflyNode[uris.length];
+    final Iterator<String> rackIter = rackStrings.iterator();
+    for (int i = 0; i < nodes.length; i++) {
+      nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
+          conf);
+    }
+    // sort all the uri's by distance from myNode, the local file system will
+    // automatically be the the first one.
+    //
+    myNode = new NodeBase(localHostName, getRack(rackIter.next()));
+    topology = NetworkTopology.getInstance(conf);
+    topology.sortByDistance(myNode, nodes, nodes.length);
+
+    this.minReplication = minReplication;
+    this.nflyFlags = nflyFlags;
+    statistics = getStatistics(nflyURI.getScheme(), getClass());
+  }
+
+  /**
+   * Transactional output stream. When creating path /dir/file
+   * 1) create invisible /real/dir_i/_nfly_tmp_file
+   * 2) when more than min replication was written, write is committed by
+   *   renaming all successfully written files to /real/dir_i/file
+   */
+  private final class NflyOutputStream extends OutputStream {
+    // actual path
+    private final Path nflyPath;
+    // tmp path before commit
+    private final Path tmpPath;
+    // broadcast set
+    private final FSDataOutputStream[] outputStreams;
+    // status set: 1 working, 0 problem
+    private final BitSet opSet;
+    private final boolean useOverwrite;
+
+    private NflyOutputStream(Path f, FsPermission permission, boolean overwrite,
+        int bufferSize, short replication, long blockSize,
+        Progressable progress) throws IOException {
+      nflyPath = f;
+      tmpPath = getNflyTmpPath(f);
+      outputStreams = new FSDataOutputStream[nodes.length];
+      for (int i = 0; i < outputStreams.length; i++) {
+        outputStreams[i] = nodes[i].fs.create(tmpPath, permission, true,
+            bufferSize, replication, blockSize, progress);
+      }
+      opSet = new BitSet(outputStreams.length);
+      opSet.set(0, outputStreams.length);
+      useOverwrite = false;
+    }
+
+    //
+    // TODO consider how to clean up and throw an exception early when the clear
+    // bits under min replication
+    //
+
+    private void mayThrow(List<IOException> ioExceptions) throws IOException {
+      final IOException ioe = MultipleIOException
+          .createIOException(ioExceptions);
+      if (opSet.cardinality() < minReplication) {
+        throw ioe;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Exceptions occurred: " + ioe);
+        }
+      }
+    }
+
+
+    @Override
+    public void write(int d) throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >=0;
+           i = opSet.nextSetBit(i + 1)) {
+        try {
+          outputStreams[i].write(d);
+        } catch (Throwable t) {
+          osException(i, "write", t, ioExceptions);
+        }
+      }
+      mayThrow(ioExceptions);
+    }
+
+    private void osException(int i, String op, Throwable t,
+        List<IOException> ioExceptions) {
+      opSet.clear(i);
+      processThrowable(nodes[i], op, t, ioExceptions, tmpPath, nflyPath);
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int len) throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >= 0;
+           i = opSet.nextSetBit(i + 1)) {
+        try {
+          outputStreams[i].write(bytes, offset, len);
+        } catch (Throwable t) {
+          osException(i, "write", t, ioExceptions);
+        }
+      }
+      mayThrow(ioExceptions);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >= 0;
+           i = opSet.nextSetBit(i + 1)) {
+        try {
+          outputStreams[i].flush();
+        } catch (Throwable t) {
+          osException(i, "flush", t, ioExceptions);
+        }
+      }
+      mayThrow(ioExceptions);
+    }
+
+    @Override
+    public void close() throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >= 0;
+           i = opSet.nextSetBit(i + 1)) {
+        try {
+          outputStreams[i].close();
+        } catch (Throwable t) {
+          osException(i, "close", t, ioExceptions);
+        }
+      }
+      if (opSet.cardinality() < minReplication) {
+        cleanupAllTmpFiles();
+        throw new IOException("Failed to sufficiently replicate: min="
+            + minReplication + " actual=" + opSet.cardinality());
+      } else {
+        commit();
+      }
+    }
+
+    private void cleanupAllTmpFiles() throws IOException {
+      for (int i = 0; i < outputStreams.length; i++) {
+        try {
+          nodes[i].fs.delete(tmpPath);
+        } catch (Throwable t) {
+          processThrowable(nodes[i], "delete", t, null, tmpPath);
+        }
+      }
+    }
+
+    private void commit() throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >= 0;
+           i = opSet.nextSetBit(i + 1)) {
+        final NflyNode nflyNode = nodes[i];
+        try {
+          if (useOverwrite) {
+            nflyNode.fs.delete(nflyPath);
+          }
+          nflyNode.fs.rename(tmpPath, nflyPath);
+
+        } catch (Throwable t) {
+          osException(i, "commit", t, ioExceptions);
+        }
+      }
+
+      if (opSet.cardinality() < minReplication) {
+        // cleanup should be done outside. If rename failed, it's unlikely that
+        // delete will work either. It's the same kind of metadata-only op
+        //
+        throw MultipleIOException.createIOException(ioExceptions);
+      }
+
+      // best effort to have a consistent timestamp
+      final long commitTime = System.currentTimeMillis();
+      for (int i = opSet.nextSetBit(0);
+          i >= 0;
+          i = opSet.nextSetBit(i + 1)) {
+        try {
+          nodes[i].fs.setTimes(nflyPath, commitTime, commitTime);
+        } catch (Throwable t) {
+          LOG.info("Failed to set timestamp: " + nodes[i] + " " + nflyPath);
+        }
+      }
+    }
+  }
+
+  private Path getNflyTmpPath(Path f) {
+    return new Path(f.getParent(), NFLY_TMP_PREFIX + f.getName());
+  }
+
+  /**
+   * // TODO
+   * Some file status implementations have expensive deserialization or metadata
+   * retrieval. This probably does not go beyond RawLocalFileSystem. Wrapping
+   * the the real file status to preserve this behavior. Otherwise, calling
+   * realStatus getters in constructor defeats this design.
+   */
+  static final class NflyStatus extends FileStatus {
+    private static final long serialVersionUID = 0x21f276d8;
+
+    private final FileStatus realStatus;
+    private final String strippedRoot;
+
+    private NflyStatus(ChRootedFileSystem realFs, FileStatus realStatus)
+        throws IOException {
+      this.realStatus = realStatus;
+      this.strippedRoot = realFs.stripOutRoot(realStatus.getPath());
+    }
+
+    String stripRoot() throws IOException {
+      return strippedRoot;
+    }
+
+    @Override
+    public long getLen() {
+      return realStatus.getLen();
+    }
+
+    @Override
+    public boolean isFile() {
+      return realStatus.isFile();
+    }
+
+    @Override
+    public boolean isDirectory() {
+      return realStatus.isDirectory();
+    }
+
+    @Override
+    public boolean isSymlink() {
+      return realStatus.isSymlink();
+    }
+
+    @Override
+    public long getBlockSize() {
+      return realStatus.getBlockSize();
+    }
+
+    @Override
+    public short getReplication() {
+      return realStatus.getReplication();
+    }
+
+    @Override
+    public long getModificationTime() {
+      return realStatus.getModificationTime();
+    }
+
+    @Override
+    public long getAccessTime() {
+      return realStatus.getAccessTime();
+    }
+
+    @Override
+    public FsPermission getPermission() {
+      return realStatus.getPermission();
+    }
+
+    @Override
+    public String getOwner() {
+      return realStatus.getOwner();
+    }
+
+    @Override
+    public String getGroup() {
+      return realStatus.getGroup();
+    }
+
+    @Override
+    public Path getPath() {
+      return realStatus.getPath();
+    }
+
+    @Override
+    public void setPath(Path p) {
+      realStatus.setPath(p);
+    }
+
+    @Override
+    public Path getSymlink() throws IOException {
+      return realStatus.getSymlink();
+    }
+
+    @Override
+    public void setSymlink(Path p) {
+      realStatus.setSymlink(p);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return realStatus.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+      return realStatus.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return realStatus.toString();
+    }
+  }
+
+  @Override
+  public URI getUri() {
+    return nflyURI;
+  }
+
+  /**
+   * Category: READ.
+   *
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   * @return input stream according to nfly flags (closest, most recent)
+   * @throws IOException
+   * @throws FileNotFoundException iff all destinations generate this exception
+   */
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    // TODO proxy stream for reads
+    final List<IOException> ioExceptions =
+        new ArrayList<IOException>(nodes.length);
+    int numNotFounds = 0;
+    final MRNflyNode[] mrNodes = workSet();
+
+    // naively iterate until one can be opened
+    //
+    for (final MRNflyNode nflyNode : mrNodes) {
+      try {
+        if (nflyFlags.contains(NflyKey.repairOnRead)
+            || nflyFlags.contains(NflyKey.readMostRecent)) {
+          // calling file status to avoid pulling bytes prematurely
+          nflyNode.updateFileStatus(f);
+        } else {
+          return nflyNode.getFs().open(f, bufferSize);
+        }
+      } catch (FileNotFoundException fnfe) {
+        nflyNode.status = notFoundStatus(f);
+        numNotFounds++;
+        processThrowable(nflyNode, "open", fnfe, ioExceptions, f);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "open", t, ioExceptions, f);
+      }
+    }
+
+    if (nflyFlags.contains(NflyKey.readMostRecent)) {
+      // sort from most recent to least recent
+      Arrays.sort(mrNodes);
+    }
+
+    final FSDataInputStream fsdisAfterRepair = repairAndOpen(mrNodes, f,
+        bufferSize);
+
+    if (fsdisAfterRepair != null) {
+      return fsdisAfterRepair;
+    }
+
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+    throw MultipleIOException.createIOException(ioExceptions);
+  }
+
+  private static FileStatus notFoundStatus(Path f) {
+    return new FileStatus(-1, false, 0, 0, 0, f);
+  }
+
+  /**
+   * Iterate all available nodes in the proximity order to attempt repair of all
+   * FileNotFound nodes.
+   *
+   * @param mrNodes work set copy of nodes
+   * @param f path to repair and open
+   * @param bufferSize buffer size for read RPC
+   * @return the closest/most recent replica stream AFTER repair
+   */
+  private FSDataInputStream repairAndOpen(MRNflyNode[] mrNodes, Path f,
+      int bufferSize) {
+    long maxMtime = 0L;
+    for (final MRNflyNode srcNode : mrNodes) {
+      if (srcNode.status == null  // not available
+          || srcNode.status.getLen() < 0L) { // not found
+        continue; // not available
+      }
+      if (srcNode.status.getModificationTime() > maxMtime) {
+        maxMtime = srcNode.status.getModificationTime();
+      }
+
+      // attempt to repair all notFound nodes with srcNode
+      //
+      for (final MRNflyNode dstNode : mrNodes) {
+        if (dstNode.status == null // not available
+            || srcNode.compareTo(dstNode) == 0) { // same mtime
+          continue;
+        }
+
+        try {
+          // status is absolute from the underlying mount, making it chrooted
+          //
+          final FileStatus srcStatus = srcNode.cloneStatus();
+          srcStatus.setPath(f);
+          final Path tmpPath = getNflyTmpPath(f);
+          FileUtil.copy(srcNode.getFs(), srcStatus, dstNode.getFs(), tmpPath,
+              false,                // don't delete
+              true,                 // overwrite
+              getConf());
+          dstNode.getFs().delete(f, false);
+          if (dstNode.getFs().rename(tmpPath, f)) {
+            try {
+              dstNode.getFs().setTimes(f, srcNode.status.getModificationTime(),
+                  srcNode.status.getAccessTime());
+            } finally {
+              // save getFileStatus rpc
+              srcStatus.setPath(dstNode.getFs().makeQualified(f));
+              dstNode.status = srcStatus;
+            }
+          }
+        } catch (IOException ioe) {
+          // can blame the source by statusSet.clear(ai), however, it would
+          // cost an extra RPC, so just rely on the loop below that will attempt
+          // an open anyhow
+          //
+          LOG.info(f + " " + srcNode + "->" + dstNode + ": Failed to repair",
+                ioe);
+        }
+      }
+    }
+
+    // Since Java7, QuickSort is used instead of MergeSort.
+    // QuickSort may not be stable and thus the equal most recent nodes, may no
+    // longer appear in the NetworkTopology order.
+    //
+    if (maxMtime > 0) {
+      final List<MRNflyNode> mrList = new ArrayList<MRNflyNode>();
+      for (final MRNflyNode openNode : mrNodes) {
+        if (openNode.status != null && openNode.status.getLen() >= 0L) {
+          if (openNode.status.getModificationTime() == maxMtime) {
+            mrList.add(openNode);
+          }
+        }
+      }
+      // assert mrList.size > 0
+      final MRNflyNode[] readNodes = mrList.toArray(new MRNflyNode[0]);
+      topology.sortByDistance(myNode, readNodes, readNodes.length);
+      for (final MRNflyNode rNode : readNodes) {
+        try {
+          return rNode.getFs().open(f, bufferSize);
+        } catch (IOException e) {
+          LOG.info(f + ": Failed to open at " + rNode.getFs().getUri());
+        }
+      }
+    }
+    return null;
+  }
+
+  private void mayThrowFileNotFound(List<IOException> ioExceptions,
+      int numNotFounds) throws FileNotFoundException {
+    if (numNotFounds == nodes.length) {
+      throw (FileNotFoundException)ioExceptions.get(nodes.length - 1);
+    }
+  }
+
+  // WRITE
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return new FSDataOutputStream(new NflyOutputStream(f, permission, overwrite,
+        bufferSize, replication, blockSize, progress), statistics);
+  }
+
+  // WRITE
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    return null;
+  }
+
+  // WRITE
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    final List<IOException> ioExceptions = new ArrayList<IOException>();
+    int numNotFounds = 0;
+    boolean succ = true;
+    for (final NflyNode nflyNode : nodes) {
+      try {
+        succ &= nflyNode.fs.rename(src, dst);
+      } catch (FileNotFoundException fnfe) {
+        numNotFounds++;
+        processThrowable(nflyNode, "rename", fnfe, ioExceptions, src, dst);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "rename", t, ioExceptions, src, dst);
+        succ = false;
+      }
+    }
+
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+
+    // if all destinations threw exceptions throw, otherwise return
+    //
+    if (ioExceptions.size() == nodes.length) {
+      throw MultipleIOException.createIOException(ioExceptions);
+    }
+
+    return succ;
+  }
+
+  // WRITE
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    final List<IOException> ioExceptions = new ArrayList<IOException>();
+    int numNotFounds = 0;
+    boolean succ = true;
+    for (final NflyNode nflyNode : nodes) {
+      try {
+        succ &= nflyNode.fs.delete(f);
+      } catch (FileNotFoundException fnfe) {
+        numNotFounds++;
+        processThrowable(nflyNode, "delete", fnfe, ioExceptions, f);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "delete", t, ioExceptions, f);
+        succ = false;
+      }
+    }
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+
+    // if all destinations threw exceptions throw, otherwise return
+    //
+    if (ioExceptions.size() == nodes.length) {
+      throw MultipleIOException.createIOException(ioExceptions);
+    }
+
+    return succ;
+  }
+
+
+  /**
+   * Returns the closest non-failing destination's result.
+   *
+   * @param f given path
+   * @return array of file statuses according to nfly modes
+   * @throws FileNotFoundException
+   * @throws IOException
+   */
+  @Override
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+      IOException {
+    final List<IOException> ioExceptions =
+        new ArrayList<IOException>(nodes.length);
+
+    final MRNflyNode[] mrNodes = workSet();
+    if (nflyFlags.contains(NflyKey.readMostRecent)) {
+      int numNotFounds = 0;
+      for (final MRNflyNode nflyNode : mrNodes) {
+        try {
+          nflyNode.updateFileStatus(f);
+        } catch (FileNotFoundException fnfe) {
+          numNotFounds++;
+          processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
+        } catch (Throwable t) {
+          processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
+        }
+      }
+      mayThrowFileNotFound(ioExceptions, numNotFounds);
+      Arrays.sort(mrNodes);
+    }
+
+    int numNotFounds = 0;
+    for (final MRNflyNode nflyNode : mrNodes) {
+      try {
+        final FileStatus[] realStats = nflyNode.getFs().listStatus(f);
+        final FileStatus[] nflyStats = new FileStatus[realStats.length];
+        for (int i = 0; i < realStats.length; i++) {
+          nflyStats[i] = new NflyStatus(nflyNode.getFs(), realStats[i]);
+        }
+        return nflyStats;
+      } catch (FileNotFoundException fnfe) {
+        numNotFounds++;
+        processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
+      }
+    }
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+    throw MultipleIOException.createIOException(ioExceptions);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
+      throws FileNotFoundException, IOException {
+    // TODO important for splits
+    return super.listLocatedStatus(f);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    for (final NflyNode nflyNode : nodes) {
+      nflyNode.fs.setWorkingDirectory(newDir);
+    }
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return nodes[0].fs.getWorkingDirectory(); // 0 is as good as any
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    boolean succ = true;
+    for (final NflyNode nflyNode : nodes) {
+      succ &= nflyNode.fs.mkdirs(f, permission);
+    }
+    return succ;
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    // TODO proxy stream for reads
+    final List<IOException> ioExceptions =
+        new ArrayList<IOException>(nodes.length);
+    int numNotFounds = 0;
+    final MRNflyNode[] mrNodes = workSet();
+
+    long maxMtime = Long.MIN_VALUE;
+    int maxMtimeIdx = Integer.MIN_VALUE;
+
+    // naively iterate until one can be returned
+    //
+    for (int i = 0; i < mrNodes.length; i++) {
+      MRNflyNode nflyNode = mrNodes[i];
+      try {
+        nflyNode.updateFileStatus(f);
+        if (nflyFlags.contains(NflyKey.readMostRecent)) {
+          final long nflyTime = nflyNode.status.getModificationTime();
+          if (nflyTime > maxMtime) {
+            maxMtime = nflyTime;
+            maxMtimeIdx = i;
+          }
+        } else {
+          return nflyNode.nflyStatus();
+        }
+      } catch (FileNotFoundException fnfe) {
+        numNotFounds++;
+        processThrowable(nflyNode, "getFileStatus", fnfe, ioExceptions, f);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "getFileStatus", t, ioExceptions, f);
+      }
+    }
+
+    if (maxMtimeIdx >= 0) {
+      return mrNodes[maxMtimeIdx].nflyStatus();
+    }
+
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+    throw MultipleIOException.createIOException(ioExceptions);
+  }
+
+  private static void processThrowable(NflyNode nflyNode, String op,
+      Throwable t, List<IOException> ioExceptions,
+      Path... f) {
+    final String errMsg = Arrays.toString(f)
+        + ": failed to " + op + " " + nflyNode.fs.getUri();
+    final IOException ioex;
+    if (t instanceof FileNotFoundException) {
+      ioex = new FileNotFoundException(errMsg);
+      ioex.initCause(t);
+    } else {
+      ioex = new IOException(errMsg, t);
+    }
+
+    if (ioExceptions != null) {
+      ioExceptions.add(ioex);
+    }
+  }
+
+  /**
+   * Initializes an nfly mountpoint in viewfs.
+   *
+   * @param uris destinations to replicate writes to
+   * @param conf file system configuration
+   * @param settings comma-separated list of k=v pairs.
+   * @return an Nfly filesystem
+   * @throws IOException
+   */
+  static FileSystem createFileSystem(URI[] uris, Configuration conf,
+      String settings) throws IOException {
+    // assert settings != null
+    int minRepl = DEFAULT_MIN_REPLICATION;
+    EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
+    final String[] kvPairs = StringUtils.split(settings);
+    for (String kv : kvPairs) {
+      final String[] kvPair = StringUtils.split(kv, '=');
+      if (kvPair.length != 2) {
+        throw new IllegalArgumentException(kv);
+      }
+      NflyKey nflyKey = NflyKey.valueOf(kvPair[0]);
+      switch (nflyKey) {
+      case minReplication:
+        minRepl = Integer.parseInt(kvPair[1]);
+        break;
+      case repairOnRead:
+      case readMostRecent:
+        if (Boolean.valueOf(kvPair[1])) {
+          nflyFlags.add(nflyKey);
+        }
+        break;
+      default:
+        throw new IllegalArgumentException(nflyKey + ": Infeasible");
+      }
+    }
+    return new NflyFSystem(uris, conf, minRepl, nflyFlags);
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
index 0a3b65d..9726100 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
@@ -60,7 +60,6 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
@@ -288,18 +287,15 @@ public class ViewFileSystem extends FileSystem {
         }
 
         @Override
-        protected
-        FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
+        protected FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
           throws URISyntaxException {
           return new InternalDirOfViewFs(dir, creationTime, ugi, myUri, config);
         }
 
         @Override
-        protected
-        FileSystem getTargetFileSystem(URI[] mergeFsURIList)
-            throws URISyntaxException, UnsupportedFileSystemException {
-          throw new UnsupportedFileSystemException("mergefs not implemented");
-          // return MergeFs.createMergeFs(mergeFsURIList, config);
+        protected FileSystem getTargetFileSystem(final String settings,
+            final URI[] uris) throws URISyntaxException, IOException {
+          return NflyFSystem.createFileSystem(uris, config, settings);
         }
       };
 
@@ -548,8 +544,13 @@ public class ViewFileSystem extends FileSystem {
 
   private Path getChrootedPath(InodeTree.ResolveResult<FileSystem> res,
       FileStatus status, Path f) throws IOException {
-    final String suffix = ((ChRootedFileSystem)res.targetFileSystem)
-        .stripOutRoot(status.getPath());
+    final String suffix;
+    if (res.targetFileSystem instanceof ChRootedFileSystem) {
+      suffix = ((ChRootedFileSystem)res.targetFileSystem)
+          .stripOutRoot(status.getPath());
+    } else { // nfly
+      suffix = ((NflyFSystem.NflyStatus)status).stripRoot();
+    }
     return this.makeQualified(
         suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix));
   }
@@ -601,10 +602,15 @@ public class ViewFileSystem extends FileSystem {
     verifyRenameStrategy(srcUri, dstUri,
         resSrc.targetFileSystem == resDst.targetFileSystem, renameStrategy);
 
-    ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem;
-    ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem;
-    return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath),
-        dstFS.fullPath(resDst.remainingPath));
+    if (resSrc.targetFileSystem instanceof ChRootedFileSystem &&
+        resDst.targetFileSystem instanceof ChRootedFileSystem) {
+      ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem;
+      ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem;
+      return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath),
+          dstFS.fullPath(resDst.remainingPath));
+    } else {
+      return resSrc.targetFileSystem.rename(resSrc.remainingPath, resDst.remainingPath);
+    }
   }
 
   static void verifyRenameStrategy(URI srcUri, URI dstUri,
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
index 1440a4b..26f3a15 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
@@ -246,15 +246,14 @@ public class ViewFs extends AbstractFileSystem {
       }
 
       @Override
-      protected
-      AbstractFileSystem getTargetFileSystem(
+      protected AbstractFileSystem getTargetFileSystem(
           final INodeDir<AbstractFileSystem> dir) throws URISyntaxException {
         return new InternalDirOfViewFs(dir, creationTime, ugi, getUri());
       }
 
       @Override
-      protected
-      AbstractFileSystem getTargetFileSystem(URI[] mergeFsURIList)
+      protected AbstractFileSystem getTargetFileSystem(final String settings,
+          final URI[] mergeFsURIList)
           throws URISyntaxException, UnsupportedFileSystemException {
         throw new UnsupportedFileSystemException("mergefs not implemented yet");
         // return MergeFs.createMergeFs(mergeFsURIList, config);
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
index 4943792..808d8b0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
@@ -18,13 +18,25 @@
 package org.apache.hadoop.fs.viewfs;
 
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
 
 import org.junit.After;
 import org.junit.Before;
-
+import org.junit.Test;
 
 
 /**
@@ -37,6 +49,8 @@ import org.junit.Before;
  */
 
 public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest {
+  private static final Log LOG =
+      LogFactory.getLog(TestViewFileSystemLocalFileSystem.class);
 
   @Override
   @Before
@@ -47,6 +61,65 @@ public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest {
     
   }
 
+  @Test
+  public void testNflyWriteSimple() throws IOException {
+    LOG.info("Starting testNflyWriteSimple");
+    final URI[] testUris = new URI[] {
+        URI.create(targetTestRoot + "/nfwd1"),
+        URI.create(targetTestRoot + "/nfwd2")
+    };
+    final String testFileName = "test.txt";
+    final Configuration testConf = new Configuration(conf);
+    final String testString = "Hello Nfly!";
+    final Path nflyRoot = new Path("/nflyroot");
+    ConfigUtil.addLinkNfly(testConf, nflyRoot.toString(), testUris);
+    final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
+
+    final FSDataOutputStream fsDos = nfly.create(
+        new Path(nflyRoot, "test.txt"));
+    try {
+      fsDos.writeUTF(testString);
+    } finally {
+      fsDos.close();
+    }
+
+    FileStatus[] statuses = nfly.listStatus(nflyRoot);
+
+    FileSystem lfs = FileSystem.getLocal(testConf);
+    for (final URI testUri : testUris) {
+      final Path testFile = new Path(new Path(testUri), testFileName);
+      assertTrue(testFile + " should exist!",  lfs.exists(testFile));
+      final FSDataInputStream fsdis = lfs.open(testFile);
+      try {
+        assertEquals("Wrong file content", testString, fsdis.readUTF());
+      } finally {
+        fsdis.close();
+      }
+    }
+  }
+
+
+  @Test
+  public void testNflyInvalidMinReplication() throws Exception {
+    LOG.info("Starting testNflyInvalidMinReplication");
+    final URI[] testUris = new URI[] {
+        URI.create(targetTestRoot + "/nfwd1"),
+        URI.create(targetTestRoot + "/nfwd2")
+    };
+
+    final Configuration conf = new Configuration();
+    ConfigUtil.addLinkNfly(conf, "mt", "/nflyroot", "minReplication=4",
+        testUris);
+    try {
+      FileSystem.get(URI.create("viewfs://mt/"), conf);
+      fail("Expected bad minReplication exception.");
+    } catch (IOException ioe) {
+      assertTrue("No minReplication message",
+          ioe.getMessage().contains("Minimum replication"));
+    }
+  }
+
+
   @Override
   @After
   public void tearDown() throws Exception {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
index 2c0a6e1..1ee86ab 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
@@ -25,7 +25,6 @@ import java.net.URISyntaxException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.junit.Test;
 
 public class TestViewFsConfig {
@@ -49,17 +48,16 @@ public class TestViewFsConfig {
       }
 
       @Override
-      protected Foo getTargetFileSystem(
-          org.apache.hadoop.fs.viewfs.InodeTree.INodeDir<Foo> dir)
-          throws URISyntaxException {
+      protected Foo getTargetFileSystem(final INodeDir<Foo> dir) {
         return null;
       }
 
       @Override
-      protected Foo getTargetFileSystem(URI[] mergeFsURIList)
-          throws URISyntaxException, UnsupportedFileSystemException {
+      protected Foo getTargetFileSystem(final String settings,
+          final URI[] mergeFsURIList) {
         return null;
       }
+
     };
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
index b8da0c5..83c2a58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.fs.viewfs;
 
-
 import java.io.IOException;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 
@@ -28,6 +28,8 @@ import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.FsConstants;
@@ -37,15 +39,25 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.*;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestViewFileSystemHdfs.class);
 
   private static MiniDFSCluster cluster;
   private static Path defaultWorkingDirectory;
@@ -231,4 +243,129 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
     assertEquals("The owner did not match ", owner, userUgi.getShortUserName());
     otherfs.delete(user1Path, false);
   }
+
+  public void testNflyClosestRepair() throws Exception {
+    testNflyRepair(NflyFSystem.NflyKey.repairOnRead);
+  }
+
+  @Test
+  public void testNflyMostRecentRepair() throws Exception {
+    testNflyRepair(NflyFSystem.NflyKey.readMostRecent);
+  }
+
+  private void testNflyRepair(NflyFSystem.NflyKey repairKey)
+      throws Exception {
+    LOG.info("Starting testNflyWriteSimpleFailover");
+    final URI uri1 = targetTestRoot.toUri();
+    final URI uri2 = targetTestRoot2.toUri();
+    final URI[] testUris = new URI[] {
+        new URI(uri1.getScheme(), uri1.getAuthority(), "/", null, null),
+        new URI(uri2.getScheme(), uri2.getAuthority(), "/", null, null)
+    };
+
+    final Configuration testConf = new Configuration(conf);
+    testConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+
+    final String testString = "Hello Nfly!";
+    final Path nflyRoot = new Path("/nflyroot");
+
+    ConfigUtil.addLinkNfly(testConf,
+        Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
+        nflyRoot.toString(),
+        "minReplication=2," + repairKey + "=true", testUris);
+
+    final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
+    // wd = /nflyroot/user/<user>
+    nfly.setWorkingDirectory(new Path(nflyRoot
+        + nfly.getWorkingDirectory().toUri().getPath()));
+
+    // 1. test mkdirs
+    final Path testDir = new Path("testdir1/sub1/sub3");
+    final Path testDir_tmp = new Path("testdir1/sub1/sub3_temp");
+    assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir));
+
+    // Test renames
+    assertTrue(nfly.rename(testDir, testDir_tmp));
+    assertTrue(nfly.rename(testDir_tmp, testDir));
+
+    for (final URI testUri : testUris) {
+      final FileSystem fs = FileSystem.get(testUri, testConf);
+      assertTrue(testDir + " should exist!", fs.exists(testDir));
+    }
+
+    // 2. test write
+    final Path testFile = new Path("test.txt");
+    final FSDataOutputStream fsDos = nfly.create(testFile);
+    try {
+      fsDos.writeUTF(testString);
+    } finally {
+      fsDos.close();
+    }
+
+    for (final URI testUri : testUris) {
+      final FileSystem fs = FileSystem.get(testUri, testConf);
+      final FSDataInputStream fsdis = fs.open(testFile);
+      try {
+        assertEquals("Wrong file content", testString, fsdis.readUTF());
+      } finally {
+        fsdis.close();
+      }
+    }
+
+    // 3. test reads when one unavailable
+    //
+    // bring one NN down and read through nfly should still work
+    //
+    for (int i = 0; i < cluster.getNumNameNodes(); i++) {
+      cluster.shutdownNameNode(i);
+      FSDataInputStream fsDis = null;
+      try {
+        fsDis = nfly.open(testFile);
+        assertEquals("Wrong file content", testString, fsDis.readUTF());
+      } finally {
+        IOUtils.cleanupWithLogger(LOG, fsDis);
+        cluster.restartNameNode(i);
+      }
+    }
+
+    // both nodes are up again, test repair
+    final FileSystem fs1 = FileSystem.get(testUris[0], conf);
+    assertTrue(fs1.delete(testFile, false));
+    assertFalse(fs1.exists(testFile));
+    FSDataInputStream fsDis = null;
+    try {
+      fsDis = nfly.open(testFile);
+      assertEquals("Wrong file content", testString, fsDis.readUTF());
+      assertTrue(fs1.exists(testFile));
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, fsDis);
+    }
+
+    // test most recent repair
+    if (repairKey == NflyFSystem.NflyKey.readMostRecent) {
+      final FileSystem fs2 = FileSystem.get(testUris[0], conf);
+      final long expectedMtime = fs2.getFileStatus(testFile)
+          .getModificationTime();
+
+      for (final URI testUri : testUris) {
+        final FileSystem fs = FileSystem.get(testUri, conf);
+        fs.setTimes(testFile, 1L, 1L);
+        assertEquals(testUri + "Set mtime failed!", 1L,
+            fs.getFileStatus(testFile).getModificationTime());
+        assertEquals("nfly file status wrong", expectedMtime,
+            nfly.getFileStatus(testFile).getModificationTime());
+        FSDataInputStream fsDis2 = null;
+        try {
+          fsDis2 = nfly.open(testFile);
+          assertEquals("Wrong file content", testString, fsDis2.readUTF());
+          // repair is done, now trying via normal fs
+          //
+          assertEquals("Repair most recent failed!", expectedMtime,
+              fs.getFileStatus(testFile).getModificationTime());
+        } finally {
+          IOUtils.cleanupWithLogger(LOG, fsDis2);
+        }
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org