You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/11 06:48:27 UTC

[02/50] [abbrv] hadoop git commit: HADOOP-12077. Provide a multi-URI replication Inode for ViewFs. Contributed by Gera Shegalov

HADOOP-12077. Provide a multi-URI replication Inode for ViewFs. Contributed by Gera Shegalov


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1f3bc63e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1f3bc63e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1f3bc63e

Branch: refs/heads/YARN-5972
Commit: 1f3bc63e6772be81bc9a6a7d93ed81d2a9e066c0
Parents: 63720ef
Author: Chris Douglas <cd...@apache.org>
Authored: Tue Sep 5 23:30:18 2017 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Tue Sep 5 23:51:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/viewfs/ConfigUtil.java |  27 +
 .../org/apache/hadoop/fs/viewfs/Constants.java  |   8 +-
 .../org/apache/hadoop/fs/viewfs/InodeTree.java  |  64 +-
 .../apache/hadoop/fs/viewfs/NflyFSystem.java    | 951 +++++++++++++++++++
 .../apache/hadoop/fs/viewfs/ViewFileSystem.java |  37 +-
 .../org/apache/hadoop/fs/viewfs/ViewFs.java     |  10 +-
 .../TestViewFileSystemLocalFileSystem.java      |  77 +-
 .../hadoop/fs/viewfs/TestViewFsConfig.java      |  13 +-
 .../fs/viewfs/TestViewFileSystemHdfs.java       | 151 ++-
 9 files changed, 1275 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
index 6900df2..a5fc62e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.viewfs;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Utilities for config variables of the viewFs See {@link ViewFs}
@@ -69,6 +70,32 @@ public class ConfigUtil {
   }
   
   /**
+   *
+   * @param conf
+   * @param mountTableName
+   * @param src
+   * @param settings
+   * @param targets
+   */
+  public static void addLinkNfly(Configuration conf, String mountTableName,
+      String src, String settings, final URI ... targets) {
+
+    settings = settings == null
+        ? "minReplication=2,repairOnRead=true"
+        : settings;
+
+    conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+            Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
+        StringUtils.uriToString(targets));
+  }
+
+  public static void addLinkNfly(final Configuration conf, final String src,
+      final URI ... targets) {
+    addLinkNfly(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, src, null,
+        targets);
+  }
+
+  /**
    * Add config variable for homedir for default mount table
    * @param conf - add to this conf
    * @param homedir - the home dir path starting with slash

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
index 9882a8e..1a07c10 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
@@ -57,7 +57,13 @@ public interface Constants {
    * Config variable for specifying a merge link
    */
   public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
-  
+
+  /**
+   * Config variable for specifying an nfly link. Nfly writes to multiple
+   * locations, and allows reads from the closest one.
+   */
+  String CONFIG_VIEWFS_LINK_NFLY = "linkNfly";
+
   /**
    * Config variable for specifying a merge of the root of the mount-table
    *  with the root of another file system. 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
index c62d5cc..665c9c9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
@@ -133,6 +133,12 @@ abstract class InodeTree<T> {
     }
   }
 
+  enum LinkType {
+    SINGLE,
+    MERGE,
+    NFLY
+  }
+
   /**
    * An internal class to represent a mount link.
    * A mount link can be single dir link or a merge dir link.
@@ -146,19 +152,17 @@ abstract class InodeTree<T> {
    * is changed later it is then ignored (a dir with null entries)
    */
   static class INodeLink<T> extends INode<T> {
-    final boolean isMergeLink; // true if MergeLink
     final URI[] targetDirLinkList;
     final T targetFileSystem;   // file system object created from the link.
 
     /**
-     * Construct a mergeLink.
+     * Construct a mergeLink or nfly.
      */
     INodeLink(final String pathToNode, final UserGroupInformation aUgi,
         final T targetMergeFs, final URI[] aTargetDirLinkList) {
       super(pathToNode, aUgi);
       targetFileSystem = targetMergeFs;
       targetDirLinkList = aTargetDirLinkList;
-      isMergeLink = true;
     }
 
     /**
@@ -170,7 +174,6 @@ abstract class InodeTree<T> {
       targetFileSystem = targetFs;
       targetDirLinkList = new URI[1];
       targetDirLinkList[0] = aTargetDirLink;
-      isMergeLink = false;
     }
 
     /**
@@ -188,7 +191,9 @@ abstract class InodeTree<T> {
   }
 
   private void createLink(final String src, final String target,
-      final boolean isLinkMerge, final UserGroupInformation aUgi)
+      final LinkType linkType, final String settings,
+      final UserGroupInformation aUgi,
+      final Configuration config)
       throws URISyntaxException, IOException,
       FileAlreadyExistsException, UnsupportedFileSystemException {
     // Validate that src is valid absolute path
@@ -235,18 +240,20 @@ abstract class InodeTree<T> {
     final INodeLink<T> newLink;
     final String fullPath = curInode.fullPath + (curInode == root ? "" : "/")
         + iPath;
-    if (isLinkMerge) { // Target is list of URIs
-      String[] targetsList = StringUtils.getStrings(target);
-      URI[] targetsListURI = new URI[targetsList.length];
-      int k = 0;
-      for (String itarget : targetsList) {
-        targetsListURI[k++] = new URI(itarget);
-      }
-      newLink = new INodeLink<T>(fullPath, aUgi,
-          getTargetFileSystem(targetsListURI), targetsListURI);
-    } else {
+    switch (linkType) {
+    case SINGLE:
       newLink = new INodeLink<T>(fullPath, aUgi,
           getTargetFileSystem(new URI(target)), new URI(target));
+      break;
+    case MERGE:
+    case NFLY:
+      final URI[] targetUris = StringUtils.stringToURI(
+          StringUtils.getStrings(target));
+      newLink = new INodeLink<T>(fullPath, aUgi,
+            getTargetFileSystem(settings, targetUris), targetUris);
+      break;
+    default:
+      throw new IllegalArgumentException(linkType + ": Infeasible linkType");
     }
     curInode.addLink(iPath, newLink);
     mountPoints.add(new MountPoint<T>(src, newLink));
@@ -257,14 +264,14 @@ abstract class InodeTree<T> {
    * 3 abstract methods.
    * @throws IOException
    */
-  protected abstract T getTargetFileSystem(final URI uri)
+  protected abstract T getTargetFileSystem(URI uri)
       throws UnsupportedFileSystemException, URISyntaxException, IOException;
 
-  protected abstract T getTargetFileSystem(final INodeDir<T> dir)
+  protected abstract T getTargetFileSystem(INodeDir<T> dir)
       throws URISyntaxException;
 
-  protected abstract T getTargetFileSystem(final URI[] mergeFsURIList)
-      throws UnsupportedFileSystemException, URISyntaxException;
+  protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
+      throws UnsupportedFileSystemException, URISyntaxException, IOException;
 
   /**
    * Create Inode Tree from the specified mount-table specified in Config
@@ -298,8 +305,9 @@ abstract class InodeTree<T> {
       final String key = si.getKey();
       if (key.startsWith(mtPrefix)) {
         gotMountTableEntry = true;
-        boolean isMergeLink = false;
+        LinkType linkType = LinkType.SINGLE;
         String src = key.substring(mtPrefix.length());
+        String settings = null;
         if (src.startsWith(linkPrefix)) {
           src = src.substring(linkPrefix.length());
           if (src.equals(SlashPath.toString())) {
@@ -309,8 +317,20 @@ abstract class InodeTree<T> {
                 + "supported yet.");
           }
         } else if (src.startsWith(linkMergePrefix)) { // A merge link
-          isMergeLink = true;
+          linkType = LinkType.MERGE;
           src = src.substring(linkMergePrefix.length());
+        } else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
+          // prefix.settings.src
+          src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
+          // settings.src
+          settings = src.substring(0, src.indexOf('.'));
+          // settings
+
+          // settings.src
+          src = src.substring(settings.length() + 1);
+          // src
+
+          linkType = LinkType.NFLY;
         } else if (src.startsWith(Constants.CONFIG_VIEWFS_HOMEDIR)) {
           // ignore - we set home dir from config
           continue;
@@ -319,7 +339,7 @@ abstract class InodeTree<T> {
               "Mount table in config: " + src);
         }
         final String target = si.getValue(); // link or merge link
-        createLink(src, target, isMergeLink, ugi);
+        createLink(src, target, linkType, settings, ugi, config);
       }
     }
     if (!gotMountTableEntry) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
new file mode 100644
index 0000000..53966b8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
@@ -0,0 +1,951 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Nfly is a multi filesystem mount point.
+ */
+@Private
+final class NflyFSystem extends FileSystem {
+  private static final Log LOG = LogFactory.getLog(NflyFSystem.class);
+  private static final String NFLY_TMP_PREFIX = "_nfly_tmp_";
+
+  enum NflyKey {
+    // minimum replication, if local filesystem is included +1 is recommended
+    minReplication,
+
+    // forces to check all the replicas and fetch the one with the most recent
+    // time stamp
+    //
+    readMostRecent,
+
+    // create missing replica from far to near, including local?
+    repairOnRead
+  }
+
+  private static final int DEFAULT_MIN_REPLICATION = 2;
+  private static URI nflyURI = URI.create("nfly:///");
+
+  private final NflyNode[] nodes;
+  private final int minReplication;
+  private final EnumSet<NflyKey> nflyFlags;
+  private final Node myNode;
+  private final NetworkTopology topology;
+
+  /**
+   * URI's authority is used as an approximation of the distance from the
+   * client. It's sufficient for DC but not accurate because worker nodes can be
+   * closer.
+   */
+  private static class NflyNode extends NodeBase {
+    private final ChRootedFileSystem fs;
+    NflyNode(String hostName, String rackName, URI uri,
+        Configuration conf) throws IOException {
+      this(hostName, rackName, new ChRootedFileSystem(uri, conf));
+    }
+
+    NflyNode(String hostName, String rackName, ChRootedFileSystem fs) {
+      super(hostName, rackName);
+      this.fs = fs;
+    }
+
+    ChRootedFileSystem getFs() {
+      return fs;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      // satisfy findbugs
+      return super.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+      // satisfy findbugs
+      return super.hashCode();
+    }
+
+  }
+
+  private static final class MRNflyNode
+      extends NflyNode implements Comparable<MRNflyNode> {
+
+    private FileStatus status;
+
+    private MRNflyNode(NflyNode n) {
+      super(n.getName(), n.getNetworkLocation(), n.fs);
+    }
+
+    private void updateFileStatus(Path f) throws IOException {
+      final FileStatus tmpStatus = getFs().getFileStatus(f);
+      status = tmpStatus == null
+          ? notFoundStatus(f)
+          : tmpStatus;
+    }
+
+    // TODO allow configurable error margin for FileSystems with different
+    // timestamp precisions
+    @Override
+    public int compareTo(MRNflyNode other) {
+      if (status == null) {
+        return other.status == null ? 0 : 1; // move non-null towards head
+      } else if (other.status == null) {
+        return -1; // move this towards head
+      } else {
+        final long mtime = status.getModificationTime();
+        final long their = other.status.getModificationTime();
+        return Long.compare(their, mtime); // move more recent towards head
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof MRNflyNode)) {
+        return false;
+      }
+      MRNflyNode other = (MRNflyNode) o;
+      return 0 == compareTo(other);
+    }
+
+    @Override
+    public int hashCode() {
+      // satisfy findbugs
+      return super.hashCode();
+    }
+
+    private FileStatus nflyStatus() throws IOException {
+      return new NflyStatus(getFs(), status);
+    }
+
+    private FileStatus cloneStatus() throws IOException {
+      return new FileStatus(status.getLen(),
+          status.isDirectory(),
+          status.getReplication(),
+          status.getBlockSize(),
+          status.getModificationTime(),
+          status.getAccessTime(),
+          null, null, null,
+          status.isSymlink() ? status.getSymlink() : null,
+          status.getPath());
+    }
+  }
+
+  private MRNflyNode[] workSet() {
+    final MRNflyNode[] res = new MRNflyNode[nodes.length];
+    for (int i = 0; i < res.length; i++) {
+      res[i] = new MRNflyNode(nodes[i]);
+    }
+    return res;
+  }
+
+
+  /**
+   * Utility to replace null with DEFAULT_RACK.
+   *
+   * @param rackString rack value, can be null
+   * @return non-null rack string
+   */
+  private static String getRack(String rackString) {
+    return rackString == null ? NetworkTopology.DEFAULT_RACK : rackString;
+  }
+
+  /**
+   * Creates a new Nfly instance.
+   *
+   * @param uris the list of uris in the mount point
+   * @param conf configuration object
+   * @param minReplication minimum copies to commit a write op
+   * @param nflyFlags modes such readMostRecent
+   * @throws IOException
+   */
+  private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
+      EnumSet<NflyKey> nflyFlags) throws IOException {
+    if (uris.length < minReplication) {
+      throw new IOException(minReplication + " < " + uris.length
+          + ": Minimum replication < #destinations");
+    }
+    setConf(conf);
+    final String localHostName = InetAddress.getLocalHost().getHostName();
+
+    // build a list for topology resolution
+    final List<String> hostStrings = new ArrayList<String>(uris.length + 1);
+    for (URI uri : uris) {
+      final String uriHost = uri.getHost();
+      // assume local file system or another closest filesystem if no authority
+      hostStrings.add(uriHost == null ? localHostName : uriHost);
+    }
+    // resolve the client node
+    hostStrings.add(localHostName);
+
+    final DNSToSwitchMapping tmpDns = ReflectionUtils.newInstance(conf.getClass(
+        CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+        ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
+
+    // this is an ArrayList
+    final List<String> rackStrings = tmpDns.resolve(hostStrings);
+    nodes = new NflyNode[uris.length];
+    final Iterator<String> rackIter = rackStrings.iterator();
+    for (int i = 0; i < nodes.length; i++) {
+      nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
+          conf);
+    }
+    // sort all the uri's by distance from myNode, the local file system will
+    // automatically be the the first one.
+    //
+    myNode = new NodeBase(localHostName, getRack(rackIter.next()));
+    topology = NetworkTopology.getInstance(conf);
+    topology.sortByDistance(myNode, nodes, nodes.length);
+
+    this.minReplication = minReplication;
+    this.nflyFlags = nflyFlags;
+    statistics = getStatistics(nflyURI.getScheme(), getClass());
+  }
+
+  /**
+   * Transactional output stream. When creating path /dir/file
+   * 1) create invisible /real/dir_i/_nfly_tmp_file
+   * 2) when more than min replication was written, write is committed by
+   *   renaming all successfully written files to /real/dir_i/file
+   */
+  private final class NflyOutputStream extends OutputStream {
+    // actual path
+    private final Path nflyPath;
+    // tmp path before commit
+    private final Path tmpPath;
+    // broadcast set
+    private final FSDataOutputStream[] outputStreams;
+    // status set: 1 working, 0 problem
+    private final BitSet opSet;
+    private final boolean useOverwrite;
+
+    private NflyOutputStream(Path f, FsPermission permission, boolean overwrite,
+        int bufferSize, short replication, long blockSize,
+        Progressable progress) throws IOException {
+      nflyPath = f;
+      tmpPath = getNflyTmpPath(f);
+      outputStreams = new FSDataOutputStream[nodes.length];
+      for (int i = 0; i < outputStreams.length; i++) {
+        outputStreams[i] = nodes[i].fs.create(tmpPath, permission, true,
+            bufferSize, replication, blockSize, progress);
+      }
+      opSet = new BitSet(outputStreams.length);
+      opSet.set(0, outputStreams.length);
+      useOverwrite = false;
+    }
+
+    //
+    // TODO consider how to clean up and throw an exception early when the clear
+    // bits under min replication
+    //
+
+    private void mayThrow(List<IOException> ioExceptions) throws IOException {
+      final IOException ioe = MultipleIOException
+          .createIOException(ioExceptions);
+      if (opSet.cardinality() < minReplication) {
+        throw ioe;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Exceptions occurred: " + ioe);
+        }
+      }
+    }
+
+
+    @Override
+    public void write(int d) throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >=0;
+           i = opSet.nextSetBit(i + 1)) {
+        try {
+          outputStreams[i].write(d);
+        } catch (Throwable t) {
+          osException(i, "write", t, ioExceptions);
+        }
+      }
+      mayThrow(ioExceptions);
+    }
+
+    private void osException(int i, String op, Throwable t,
+        List<IOException> ioExceptions) {
+      opSet.clear(i);
+      processThrowable(nodes[i], op, t, ioExceptions, tmpPath, nflyPath);
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int len) throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >= 0;
+           i = opSet.nextSetBit(i + 1)) {
+        try {
+          outputStreams[i].write(bytes, offset, len);
+        } catch (Throwable t) {
+          osException(i, "write", t, ioExceptions);
+        }
+      }
+      mayThrow(ioExceptions);
+    }
+
+    @Override
+    public void flush() throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >= 0;
+           i = opSet.nextSetBit(i + 1)) {
+        try {
+          outputStreams[i].flush();
+        } catch (Throwable t) {
+          osException(i, "flush", t, ioExceptions);
+        }
+      }
+      mayThrow(ioExceptions);
+    }
+
+    @Override
+    public void close() throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >= 0;
+           i = opSet.nextSetBit(i + 1)) {
+        try {
+          outputStreams[i].close();
+        } catch (Throwable t) {
+          osException(i, "close", t, ioExceptions);
+        }
+      }
+      if (opSet.cardinality() < minReplication) {
+        cleanupAllTmpFiles();
+        throw new IOException("Failed to sufficiently replicate: min="
+            + minReplication + " actual=" + opSet.cardinality());
+      } else {
+        commit();
+      }
+    }
+
+    private void cleanupAllTmpFiles() throws IOException {
+      for (int i = 0; i < outputStreams.length; i++) {
+        try {
+          nodes[i].fs.delete(tmpPath);
+        } catch (Throwable t) {
+          processThrowable(nodes[i], "delete", t, null, tmpPath);
+        }
+      }
+    }
+
+    private void commit() throws IOException {
+      final List<IOException> ioExceptions = new ArrayList<IOException>();
+      for (int i = opSet.nextSetBit(0);
+           i >= 0;
+           i = opSet.nextSetBit(i + 1)) {
+        final NflyNode nflyNode = nodes[i];
+        try {
+          if (useOverwrite) {
+            nflyNode.fs.delete(nflyPath);
+          }
+          nflyNode.fs.rename(tmpPath, nflyPath);
+
+        } catch (Throwable t) {
+          osException(i, "commit", t, ioExceptions);
+        }
+      }
+
+      if (opSet.cardinality() < minReplication) {
+        // cleanup should be done outside. If rename failed, it's unlikely that
+        // delete will work either. It's the same kind of metadata-only op
+        //
+        throw MultipleIOException.createIOException(ioExceptions);
+      }
+
+      // best effort to have a consistent timestamp
+      final long commitTime = System.currentTimeMillis();
+      for (int i = opSet.nextSetBit(0);
+          i >= 0;
+          i = opSet.nextSetBit(i + 1)) {
+        try {
+          nodes[i].fs.setTimes(nflyPath, commitTime, commitTime);
+        } catch (Throwable t) {
+          LOG.info("Failed to set timestamp: " + nodes[i] + " " + nflyPath);
+        }
+      }
+    }
+  }
+
+  private Path getNflyTmpPath(Path f) {
+    return new Path(f.getParent(), NFLY_TMP_PREFIX + f.getName());
+  }
+
+  /**
+   * // TODO
+   * Some file status implementations have expensive deserialization or metadata
+   * retrieval. This probably does not go beyond RawLocalFileSystem. Wrapping
+   * the the real file status to preserve this behavior. Otherwise, calling
+   * realStatus getters in constructor defeats this design.
+   */
+  static final class NflyStatus extends FileStatus {
+    private static final long serialVersionUID = 0x21f276d8;
+
+    private final FileStatus realStatus;
+    private final String strippedRoot;
+
+    private NflyStatus(ChRootedFileSystem realFs, FileStatus realStatus)
+        throws IOException {
+      this.realStatus = realStatus;
+      this.strippedRoot = realFs.stripOutRoot(realStatus.getPath());
+    }
+
+    String stripRoot() throws IOException {
+      return strippedRoot;
+    }
+
+    @Override
+    public long getLen() {
+      return realStatus.getLen();
+    }
+
+    @Override
+    public boolean isFile() {
+      return realStatus.isFile();
+    }
+
+    @Override
+    public boolean isDirectory() {
+      return realStatus.isDirectory();
+    }
+
+    @Override
+    public boolean isSymlink() {
+      return realStatus.isSymlink();
+    }
+
+    @Override
+    public long getBlockSize() {
+      return realStatus.getBlockSize();
+    }
+
+    @Override
+    public short getReplication() {
+      return realStatus.getReplication();
+    }
+
+    @Override
+    public long getModificationTime() {
+      return realStatus.getModificationTime();
+    }
+
+    @Override
+    public long getAccessTime() {
+      return realStatus.getAccessTime();
+    }
+
+    @Override
+    public FsPermission getPermission() {
+      return realStatus.getPermission();
+    }
+
+    @Override
+    public String getOwner() {
+      return realStatus.getOwner();
+    }
+
+    @Override
+    public String getGroup() {
+      return realStatus.getGroup();
+    }
+
+    @Override
+    public Path getPath() {
+      return realStatus.getPath();
+    }
+
+    @Override
+    public void setPath(Path p) {
+      realStatus.setPath(p);
+    }
+
+    @Override
+    public Path getSymlink() throws IOException {
+      return realStatus.getSymlink();
+    }
+
+    @Override
+    public void setSymlink(Path p) {
+      realStatus.setSymlink(p);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return realStatus.equals(o);
+    }
+
+    @Override
+    public int hashCode() {
+      return realStatus.hashCode();
+    }
+
+    @Override
+    public String toString() {
+      return realStatus.toString();
+    }
+  }
+
+  @Override
+  public URI getUri() {
+    return nflyURI;
+  }
+
+  /**
+   * Category: READ.
+   *
+   * @param f the file name to open
+   * @param bufferSize the size of the buffer to be used.
+   * @return input stream according to nfly flags (closest, most recent)
+   * @throws IOException
+   * @throws FileNotFoundException iff all destinations generate this exception
+   */
+  @Override
+  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+    // TODO proxy stream for reads
+    final List<IOException> ioExceptions =
+        new ArrayList<IOException>(nodes.length);
+    int numNotFounds = 0;
+    final MRNflyNode[] mrNodes = workSet();
+
+    // naively iterate until one can be opened
+    //
+    for (final MRNflyNode nflyNode : mrNodes) {
+      try {
+        if (nflyFlags.contains(NflyKey.repairOnRead)
+            || nflyFlags.contains(NflyKey.readMostRecent)) {
+          // calling file status to avoid pulling bytes prematurely
+          nflyNode.updateFileStatus(f);
+        } else {
+          return nflyNode.getFs().open(f, bufferSize);
+        }
+      } catch (FileNotFoundException fnfe) {
+        nflyNode.status = notFoundStatus(f);
+        numNotFounds++;
+        processThrowable(nflyNode, "open", fnfe, ioExceptions, f);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "open", t, ioExceptions, f);
+      }
+    }
+
+    if (nflyFlags.contains(NflyKey.readMostRecent)) {
+      // sort from most recent to least recent
+      Arrays.sort(mrNodes);
+    }
+
+    final FSDataInputStream fsdisAfterRepair = repairAndOpen(mrNodes, f,
+        bufferSize);
+
+    if (fsdisAfterRepair != null) {
+      return fsdisAfterRepair;
+    }
+
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+    throw MultipleIOException.createIOException(ioExceptions);
+  }
+
+  private static FileStatus notFoundStatus(Path f) {
+    return new FileStatus(-1, false, 0, 0, 0, f);
+  }
+
+  /**
+   * Iterate all available nodes in the proximity order to attempt repair of all
+   * FileNotFound nodes.
+   *
+   * @param mrNodes work set copy of nodes
+   * @param f path to repair and open
+   * @param bufferSize buffer size for read RPC
+   * @return the closest/most recent replica stream AFTER repair
+   */
+  private FSDataInputStream repairAndOpen(MRNflyNode[] mrNodes, Path f,
+      int bufferSize) {
+    long maxMtime = 0L;
+    for (final MRNflyNode srcNode : mrNodes) {
+      if (srcNode.status == null  // not available
+          || srcNode.status.getLen() < 0L) { // not found
+        continue; // not available
+      }
+      if (srcNode.status.getModificationTime() > maxMtime) {
+        maxMtime = srcNode.status.getModificationTime();
+      }
+
+      // attempt to repair all notFound nodes with srcNode
+      //
+      for (final MRNflyNode dstNode : mrNodes) {
+        if (dstNode.status == null // not available
+            || srcNode.compareTo(dstNode) == 0) { // same mtime
+          continue;
+        }
+
+        try {
+          // status is absolute from the underlying mount, making it chrooted
+          //
+          final FileStatus srcStatus = srcNode.cloneStatus();
+          srcStatus.setPath(f);
+          final Path tmpPath = getNflyTmpPath(f);
+          FileUtil.copy(srcNode.getFs(), srcStatus, dstNode.getFs(), tmpPath,
+              false,                // don't delete
+              true,                 // overwrite
+              getConf());
+          dstNode.getFs().delete(f, false);
+          if (dstNode.getFs().rename(tmpPath, f)) {
+            try {
+              dstNode.getFs().setTimes(f, srcNode.status.getModificationTime(),
+                  srcNode.status.getAccessTime());
+            } finally {
+              // save getFileStatus rpc
+              srcStatus.setPath(dstNode.getFs().makeQualified(f));
+              dstNode.status = srcStatus;
+            }
+          }
+        } catch (IOException ioe) {
+          // can blame the source by statusSet.clear(ai), however, it would
+          // cost an extra RPC, so just rely on the loop below that will attempt
+          // an open anyhow
+          //
+          LOG.info(f + " " + srcNode + "->" + dstNode + ": Failed to repair",
+                ioe);
+        }
+      }
+    }
+
+    // Since Java7, QuickSort is used instead of MergeSort.
+    // QuickSort may not be stable and thus the equal most recent nodes, may no
+    // longer appear in the NetworkTopology order.
+    //
+    if (maxMtime > 0) {
+      final List<MRNflyNode> mrList = new ArrayList<MRNflyNode>();
+      for (final MRNflyNode openNode : mrNodes) {
+        if (openNode.status != null && openNode.status.getLen() >= 0L) {
+          if (openNode.status.getModificationTime() == maxMtime) {
+            mrList.add(openNode);
+          }
+        }
+      }
+      // assert mrList.size > 0
+      final MRNflyNode[] readNodes = mrList.toArray(new MRNflyNode[0]);
+      topology.sortByDistance(myNode, readNodes, readNodes.length);
+      for (final MRNflyNode rNode : readNodes) {
+        try {
+          return rNode.getFs().open(f, bufferSize);
+        } catch (IOException e) {
+          LOG.info(f + ": Failed to open at " + rNode.getFs().getUri());
+        }
+      }
+    }
+    return null;
+  }
+
+  private void mayThrowFileNotFound(List<IOException> ioExceptions,
+      int numNotFounds) throws FileNotFoundException {
+    if (numNotFounds == nodes.length) {
+      throw (FileNotFoundException)ioExceptions.get(nodes.length - 1);
+    }
+  }
+
+  // WRITE
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    return new FSDataOutputStream(new NflyOutputStream(f, permission, overwrite,
+        bufferSize, replication, blockSize, progress), statistics);
+  }
+
+  // WRITE
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    return null;
+  }
+
+  // WRITE
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    final List<IOException> ioExceptions = new ArrayList<IOException>();
+    int numNotFounds = 0;
+    boolean succ = true;
+    for (final NflyNode nflyNode : nodes) {
+      try {
+        succ &= nflyNode.fs.rename(src, dst);
+      } catch (FileNotFoundException fnfe) {
+        numNotFounds++;
+        processThrowable(nflyNode, "rename", fnfe, ioExceptions, src, dst);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "rename", t, ioExceptions, src, dst);
+        succ = false;
+      }
+    }
+
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+
+    // if all destinations threw exceptions throw, otherwise return
+    //
+    if (ioExceptions.size() == nodes.length) {
+      throw MultipleIOException.createIOException(ioExceptions);
+    }
+
+    return succ;
+  }
+
+  // WRITE
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    final List<IOException> ioExceptions = new ArrayList<IOException>();
+    int numNotFounds = 0;
+    boolean succ = true;
+    for (final NflyNode nflyNode : nodes) {
+      try {
+        succ &= nflyNode.fs.delete(f);
+      } catch (FileNotFoundException fnfe) {
+        numNotFounds++;
+        processThrowable(nflyNode, "delete", fnfe, ioExceptions, f);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "delete", t, ioExceptions, f);
+        succ = false;
+      }
+    }
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+
+    // if all destinations threw exceptions throw, otherwise return
+    //
+    if (ioExceptions.size() == nodes.length) {
+      throw MultipleIOException.createIOException(ioExceptions);
+    }
+
+    return succ;
+  }
+
+
+  /**
+   * Returns the closest non-failing destination's result.
+   *
+   * @param f given path
+   * @return array of file statuses according to nfly modes
+   * @throws FileNotFoundException
+   * @throws IOException
+   */
+  @Override
+  public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+      IOException {
+    final List<IOException> ioExceptions =
+        new ArrayList<IOException>(nodes.length);
+
+    final MRNflyNode[] mrNodes = workSet();
+    if (nflyFlags.contains(NflyKey.readMostRecent)) {
+      int numNotFounds = 0;
+      for (final MRNflyNode nflyNode : mrNodes) {
+        try {
+          nflyNode.updateFileStatus(f);
+        } catch (FileNotFoundException fnfe) {
+          numNotFounds++;
+          processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
+        } catch (Throwable t) {
+          processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
+        }
+      }
+      mayThrowFileNotFound(ioExceptions, numNotFounds);
+      Arrays.sort(mrNodes);
+    }
+
+    int numNotFounds = 0;
+    for (final MRNflyNode nflyNode : mrNodes) {
+      try {
+        final FileStatus[] realStats = nflyNode.getFs().listStatus(f);
+        final FileStatus[] nflyStats = new FileStatus[realStats.length];
+        for (int i = 0; i < realStats.length; i++) {
+          nflyStats[i] = new NflyStatus(nflyNode.getFs(), realStats[i]);
+        }
+        return nflyStats;
+      } catch (FileNotFoundException fnfe) {
+        numNotFounds++;
+        processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
+      }
+    }
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+    throw MultipleIOException.createIOException(ioExceptions);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
+      throws FileNotFoundException, IOException {
+    // TODO important for splits
+    return super.listLocatedStatus(f);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path newDir) {
+    for (final NflyNode nflyNode : nodes) {
+      nflyNode.fs.setWorkingDirectory(newDir);
+    }
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return nodes[0].fs.getWorkingDirectory(); // 0 is as good as any
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    boolean succ = true;
+    for (final NflyNode nflyNode : nodes) {
+      succ &= nflyNode.fs.mkdirs(f, permission);
+    }
+    return succ;
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    // TODO proxy stream for reads
+    final List<IOException> ioExceptions =
+        new ArrayList<IOException>(nodes.length);
+    int numNotFounds = 0;
+    final MRNflyNode[] mrNodes = workSet();
+
+    long maxMtime = Long.MIN_VALUE;
+    int maxMtimeIdx = Integer.MIN_VALUE;
+
+    // naively iterate until one can be returned
+    //
+    for (int i = 0; i < mrNodes.length; i++) {
+      MRNflyNode nflyNode = mrNodes[i];
+      try {
+        nflyNode.updateFileStatus(f);
+        if (nflyFlags.contains(NflyKey.readMostRecent)) {
+          final long nflyTime = nflyNode.status.getModificationTime();
+          if (nflyTime > maxMtime) {
+            maxMtime = nflyTime;
+            maxMtimeIdx = i;
+          }
+        } else {
+          return nflyNode.nflyStatus();
+        }
+      } catch (FileNotFoundException fnfe) {
+        numNotFounds++;
+        processThrowable(nflyNode, "getFileStatus", fnfe, ioExceptions, f);
+      } catch (Throwable t) {
+        processThrowable(nflyNode, "getFileStatus", t, ioExceptions, f);
+      }
+    }
+
+    if (maxMtimeIdx >= 0) {
+      return mrNodes[maxMtimeIdx].nflyStatus();
+    }
+
+    mayThrowFileNotFound(ioExceptions, numNotFounds);
+    throw MultipleIOException.createIOException(ioExceptions);
+  }
+
+  private static void processThrowable(NflyNode nflyNode, String op,
+      Throwable t, List<IOException> ioExceptions,
+      Path... f) {
+    final String errMsg = Arrays.toString(f)
+        + ": failed to " + op + " " + nflyNode.fs.getUri();
+    final IOException ioex;
+    if (t instanceof FileNotFoundException) {
+      ioex = new FileNotFoundException(errMsg);
+      ioex.initCause(t);
+    } else {
+      ioex = new IOException(errMsg, t);
+    }
+
+    if (ioExceptions != null) {
+      ioExceptions.add(ioex);
+    }
+  }
+
+  /**
+   * Initializes an nfly mountpoint in viewfs.
+   *
+   * @param uris destinations to replicate writes to
+   * @param conf file system configuration
+   * @param settings comma-separated list of k=v pairs.
+   * @return an Nfly filesystem
+   * @throws IOException
+   */
+  static FileSystem createFileSystem(URI[] uris, Configuration conf,
+      String settings) throws IOException {
+    // assert settings != null
+    int minRepl = DEFAULT_MIN_REPLICATION;
+    EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
+    final String[] kvPairs = StringUtils.split(settings);
+    for (String kv : kvPairs) {
+      final String[] kvPair = StringUtils.split(kv, '=');
+      if (kvPair.length != 2) {
+        throw new IllegalArgumentException(kv);
+      }
+      NflyKey nflyKey = NflyKey.valueOf(kvPair[0]);
+      switch (nflyKey) {
+      case minReplication:
+        minRepl = Integer.parseInt(kvPair[1]);
+        break;
+      case repairOnRead:
+      case readMostRecent:
+        if (Boolean.valueOf(kvPair[1])) {
+          nflyFlags.add(nflyKey);
+        }
+        break;
+      default:
+        throw new IllegalArgumentException(nflyKey + ": Infeasible");
+      }
+    }
+    return new NflyFSystem(uris, conf, minRepl, nflyFlags);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
index 158b099..ca1380a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.QuotaUsage;
 import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
@@ -186,25 +185,21 @@ public class ViewFileSystem extends FileSystem {
       fsState = new InodeTree<FileSystem>(conf, authority) {
 
         @Override
-        protected
-        FileSystem getTargetFileSystem(final URI uri)
+        protected FileSystem getTargetFileSystem(final URI uri)
           throws URISyntaxException, IOException {
             return new ChRootedFileSystem(uri, config);
         }
 
         @Override
-        protected
-        FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
+        protected FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
           throws URISyntaxException {
           return new InternalDirOfViewFs(dir, creationTime, ugi, myUri, config);
         }
 
         @Override
-        protected
-        FileSystem getTargetFileSystem(URI[] mergeFsURIList)
-            throws URISyntaxException, UnsupportedFileSystemException {
-          throw new UnsupportedFileSystemException("mergefs not implemented");
-          // return MergeFs.createMergeFs(mergeFsURIList, config);
+        protected FileSystem getTargetFileSystem(final String settings,
+            final URI[] uris) throws URISyntaxException, IOException {
+          return NflyFSystem.createFileSystem(uris, config, settings);
         }
       };
       workingDir = this.getHomeDirectory();
@@ -455,8 +450,13 @@ public class ViewFileSystem extends FileSystem {
 
   private Path getChrootedPath(InodeTree.ResolveResult<FileSystem> res,
       FileStatus status, Path f) throws IOException {
-    final String suffix = ((ChRootedFileSystem)res.targetFileSystem)
-        .stripOutRoot(status.getPath());
+    final String suffix;
+    if (res.targetFileSystem instanceof ChRootedFileSystem) {
+      suffix = ((ChRootedFileSystem)res.targetFileSystem)
+          .stripOutRoot(status.getPath());
+    } else { // nfly
+      suffix = ((NflyFSystem.NflyStatus)status).stripRoot();
+    }
     return this.makeQualified(
         suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix));
   }
@@ -501,10 +501,15 @@ public class ViewFileSystem extends FileSystem {
     verifyRenameStrategy(srcUri, dstUri,
         resSrc.targetFileSystem == resDst.targetFileSystem, renameStrategy);
 
-    ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem;
-    ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem;
-    return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath),
-        dstFS.fullPath(resDst.remainingPath));
+    if (resSrc.targetFileSystem instanceof ChRootedFileSystem &&
+        resDst.targetFileSystem instanceof ChRootedFileSystem) {
+      ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem;
+      ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem;
+      return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath),
+          dstFS.fullPath(resDst.remainingPath));
+    } else {
+      return resSrc.targetFileSystem.rename(resSrc.remainingPath, resDst.remainingPath);
+    }
   }
 
   static void verifyRenameStrategy(URI srcUri, URI dstUri,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
index 364485f..6a89f27 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
@@ -212,8 +212,7 @@ public class ViewFs extends AbstractFileSystem {
     fsState = new InodeTree<AbstractFileSystem>(conf, authority) {
 
       @Override
-      protected
-      AbstractFileSystem getTargetFileSystem(final URI uri)
+      protected AbstractFileSystem getTargetFileSystem(final URI uri)
         throws URISyntaxException, UnsupportedFileSystemException {
           String pathString = uri.getPath();
           if (pathString.isEmpty()) {
@@ -225,15 +224,14 @@ public class ViewFs extends AbstractFileSystem {
       }
 
       @Override
-      protected
-      AbstractFileSystem getTargetFileSystem(
+      protected AbstractFileSystem getTargetFileSystem(
           final INodeDir<AbstractFileSystem> dir) throws URISyntaxException {
         return new InternalDirOfViewFs(dir, creationTime, ugi, getUri());
       }
 
       @Override
-      protected
-      AbstractFileSystem getTargetFileSystem(URI[] mergeFsURIList)
+      protected AbstractFileSystem getTargetFileSystem(final String settings,
+          final URI[] mergeFsURIList)
           throws URISyntaxException, UnsupportedFileSystemException {
         throw new UnsupportedFileSystemException("mergefs not implemented yet");
         // return MergeFs.createMergeFs(mergeFsURIList, config);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
index 4943792..808d8b0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
@@ -18,13 +18,25 @@
 package org.apache.hadoop.fs.viewfs;
 
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
 
 import org.junit.After;
 import org.junit.Before;
-
+import org.junit.Test;
 
 
 /**
@@ -37,6 +49,8 @@ import org.junit.Before;
  */
 
 public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest {
+  private static final Log LOG =
+      LogFactory.getLog(TestViewFileSystemLocalFileSystem.class);
 
   @Override
   @Before
@@ -47,6 +61,65 @@ public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest {
     
   }
 
+  @Test
+  public void testNflyWriteSimple() throws IOException {
+    LOG.info("Starting testNflyWriteSimple");
+    final URI[] testUris = new URI[] {
+        URI.create(targetTestRoot + "/nfwd1"),
+        URI.create(targetTestRoot + "/nfwd2")
+    };
+    final String testFileName = "test.txt";
+    final Configuration testConf = new Configuration(conf);
+    final String testString = "Hello Nfly!";
+    final Path nflyRoot = new Path("/nflyroot");
+    ConfigUtil.addLinkNfly(testConf, nflyRoot.toString(), testUris);
+    final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
+
+    final FSDataOutputStream fsDos = nfly.create(
+        new Path(nflyRoot, "test.txt"));
+    try {
+      fsDos.writeUTF(testString);
+    } finally {
+      fsDos.close();
+    }
+
+    FileStatus[] statuses = nfly.listStatus(nflyRoot);
+
+    FileSystem lfs = FileSystem.getLocal(testConf);
+    for (final URI testUri : testUris) {
+      final Path testFile = new Path(new Path(testUri), testFileName);
+      assertTrue(testFile + " should exist!",  lfs.exists(testFile));
+      final FSDataInputStream fsdis = lfs.open(testFile);
+      try {
+        assertEquals("Wrong file content", testString, fsdis.readUTF());
+      } finally {
+        fsdis.close();
+      }
+    }
+  }
+
+
+  @Test
+  public void testNflyInvalidMinReplication() throws Exception {
+    LOG.info("Starting testNflyInvalidMinReplication");
+    final URI[] testUris = new URI[] {
+        URI.create(targetTestRoot + "/nfwd1"),
+        URI.create(targetTestRoot + "/nfwd2")
+    };
+
+    final Configuration conf = new Configuration();
+    ConfigUtil.addLinkNfly(conf, "mt", "/nflyroot", "minReplication=4",
+        testUris);
+    try {
+      FileSystem.get(URI.create("viewfs://mt/"), conf);
+      fail("Expected bad minReplication exception.");
+    } catch (IOException ioe) {
+      assertTrue("No minReplication message",
+          ioe.getMessage().contains("Minimum replication"));
+    }
+  }
+
+
   @Override
   @After
   public void tearDown() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
index 895ae0c..136837f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
@@ -24,7 +24,6 @@ import java.net.URISyntaxException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.junit.Test;
 
 public class TestViewFsConfig {
@@ -43,23 +42,21 @@ public class TestViewFsConfig {
     new InodeTree<Foo>(conf, null) {
 
       @Override
-      protected Foo getTargetFileSystem(final URI uri)
-          throws URISyntaxException, UnsupportedFileSystemException {
+      protected Foo getTargetFileSystem(final URI uri) {
         return null;
       }
 
       @Override
-      protected Foo getTargetFileSystem(
-          org.apache.hadoop.fs.viewfs.InodeTree.INodeDir<Foo> dir)
-          throws URISyntaxException {
+      protected Foo getTargetFileSystem(final INodeDir<Foo> dir) {
         return null;
       }
 
       @Override
-      protected Foo getTargetFileSystem(URI[] mergeFsURIList)
-          throws URISyntaxException, UnsupportedFileSystemException {
+      protected Foo getTargetFileSystem(final String settings,
+          final URI[] mergeFsURIList) {
         return null;
       }
+
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
index b8f5379..b8bed1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
@@ -17,11 +17,9 @@
  */
 package org.apache.hadoop.fs.viewfs;
 
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
-
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.EnumSet;
 
@@ -31,6 +29,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,17 +45,26 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.test.GenericTestUtils;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
+
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import static org.junit.Assert.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestViewFileSystemHdfs.class);
+
 
   private static MiniDFSCluster cluster;
   private static Path defaultWorkingDirectory;
@@ -190,12 +199,12 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
 
     //Verify file deletion within EZ
     DFSTestUtil.verifyDelete(shell, fsTarget, encFile, true);
-    Assert.assertTrue("ViewFileSystem trash roots should include EZ file trash",
+    assertTrue("ViewFileSystem trash roots should include EZ file trash",
         (fsView.getTrashRoots(true).size() == 1));
 
     //Verify deletion of EZ
     DFSTestUtil.verifyDelete(shell, fsTarget, zone, true);
-    Assert.assertTrue("ViewFileSystem trash roots should include EZ zone trash",
+    assertTrue("ViewFileSystem trash roots should include EZ zone trash",
         (fsView.getTrashRoots(true).size() == 2));
   }
 
@@ -239,14 +248,14 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
         viewFs.getFileChecksum(mountDataFilePath);
     FileChecksum fileChecksumViaTargetFs =
         fsTarget.getFileChecksum(fsTargetFilePath);
-    Assert.assertTrue("File checksum not matching!",
+    assertTrue("File checksum not matching!",
         fileChecksumViaViewFs.equals(fileChecksumViaTargetFs));
 
     fileChecksumViaViewFs =
         viewFs.getFileChecksum(mountDataFilePath, fileLength / 2);
     fileChecksumViaTargetFs =
         fsTarget.getFileChecksum(fsTargetFilePath, fileLength / 2);
-    Assert.assertTrue("File checksum not matching!",
+    assertTrue("File checksum not matching!",
         fileChecksumViaViewFs.equals(fileChecksumViaTargetFs));
   }
 
@@ -269,4 +278,130 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
               e);
     }
   }
+
+  @Test
+  public void testNflyClosestRepair() throws Exception {
+    testNflyRepair(NflyFSystem.NflyKey.repairOnRead);
+  }
+
+  @Test
+  public void testNflyMostRecentRepair() throws Exception {
+    testNflyRepair(NflyFSystem.NflyKey.readMostRecent);
+  }
+
+  private void testNflyRepair(NflyFSystem.NflyKey repairKey)
+      throws Exception {
+    LOG.info("Starting testNflyWriteSimpleFailover");
+    final URI uri1 = targetTestRoot.toUri();
+    final URI uri2 = targetTestRoot2.toUri();
+    final URI[] testUris = new URI[] {
+        new URI(uri1.getScheme(), uri1.getAuthority(), "/", null, null),
+        new URI(uri2.getScheme(), uri2.getAuthority(), "/", null, null)
+    };
+
+    final Configuration testConf = new Configuration(conf);
+    testConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+
+    final String testString = "Hello Nfly!";
+    final Path nflyRoot = new Path("/nflyroot");
+
+    ConfigUtil.addLinkNfly(testConf,
+        Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
+        nflyRoot.toString(),
+        "minReplication=2," + repairKey + "=true", testUris);
+
+    final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
+    // wd = /nflyroot/user/<user>
+    nfly.setWorkingDirectory(new Path(nflyRoot
+        + nfly.getWorkingDirectory().toUri().getPath()));
+
+    // 1. test mkdirs
+    final Path testDir = new Path("testdir1/sub1/sub3");
+    final Path testDir_tmp = new Path("testdir1/sub1/sub3_temp");
+    assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir));
+
+    // Test renames
+    assertTrue(nfly.rename(testDir, testDir_tmp));
+    assertTrue(nfly.rename(testDir_tmp, testDir));
+
+    for (final URI testUri : testUris) {
+      final FileSystem fs = FileSystem.get(testUri, testConf);
+      assertTrue(testDir + " should exist!", fs.exists(testDir));
+    }
+
+    // 2. test write
+    final Path testFile = new Path("test.txt");
+    final FSDataOutputStream fsDos = nfly.create(testFile);
+    try {
+      fsDos.writeUTF(testString);
+    } finally {
+      fsDos.close();
+    }
+
+    for (final URI testUri : testUris) {
+      final FileSystem fs = FileSystem.get(testUri, testConf);
+      final FSDataInputStream fsdis = fs.open(testFile);
+      try {
+        assertEquals("Wrong file content", testString, fsdis.readUTF());
+      } finally {
+        fsdis.close();
+      }
+    }
+
+    // 3. test reads when one unavailable
+    //
+    // bring one NN down and read through nfly should still work
+    //
+    for (int i = 0; i < cluster.getNumNameNodes(); i++) {
+      cluster.shutdownNameNode(i);
+      FSDataInputStream fsDis = null;
+      try {
+        fsDis = nfly.open(testFile);
+        assertEquals("Wrong file content", testString, fsDis.readUTF());
+      } finally {
+        IOUtils.cleanupWithLogger(LOG, fsDis);
+        cluster.restartNameNode(i);
+      }
+    }
+
+    // both nodes are up again, test repair
+    final FileSystem fs1 = FileSystem.get(testUris[0], conf);
+    assertTrue(fs1.delete(testFile, false));
+    assertFalse(fs1.exists(testFile));
+    FSDataInputStream fsDis = null;
+    try {
+      fsDis = nfly.open(testFile);
+      assertEquals("Wrong file content", testString, fsDis.readUTF());
+      assertTrue(fs1.exists(testFile));
+    } finally {
+      IOUtils.cleanupWithLogger(LOG, fsDis);
+    }
+
+    // test most recent repair
+    if (repairKey == NflyFSystem.NflyKey.readMostRecent) {
+      final FileSystem fs2 = FileSystem.get(testUris[0], conf);
+      final long expectedMtime = fs2.getFileStatus(testFile)
+          .getModificationTime();
+
+      for (final URI testUri : testUris) {
+        final FileSystem fs = FileSystem.get(testUri, conf);
+        fs.setTimes(testFile, 1L, 1L);
+        assertEquals(testUri + "Set mtime failed!", 1L,
+            fs.getFileStatus(testFile).getModificationTime());
+        assertEquals("nfly file status wrong", expectedMtime,
+            nfly.getFileStatus(testFile).getModificationTime());
+        FSDataInputStream fsDis2 = null;
+        try {
+          fsDis2 = nfly.open(testFile);
+          assertEquals("Wrong file content", testString, fsDis2.readUTF());
+          // repair is done, now trying via normal fs
+          //
+          assertEquals("Repair most recent failed!", expectedMtime,
+              fs.getFileStatus(testFile).getModificationTime());
+        } finally {
+          IOUtils.cleanupWithLogger(LOG, fsDis2);
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org