You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/11 06:48:27 UTC
[02/50] [abbrv] hadoop git commit: HADOOP-12077. Provide a multi-URI
replication Inode for ViewFs. Contributed by Gera Shegalov
HADOOP-12077. Provide a multi-URI replication Inode for ViewFs. Contributed by Gera Shegalov
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1f3bc63e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1f3bc63e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1f3bc63e
Branch: refs/heads/YARN-5972
Commit: 1f3bc63e6772be81bc9a6a7d93ed81d2a9e066c0
Parents: 63720ef
Author: Chris Douglas <cd...@apache.org>
Authored: Tue Sep 5 23:30:18 2017 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Tue Sep 5 23:51:51 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/fs/viewfs/ConfigUtil.java | 27 +
.../org/apache/hadoop/fs/viewfs/Constants.java | 8 +-
.../org/apache/hadoop/fs/viewfs/InodeTree.java | 64 +-
.../apache/hadoop/fs/viewfs/NflyFSystem.java | 951 +++++++++++++++++++
.../apache/hadoop/fs/viewfs/ViewFileSystem.java | 37 +-
.../org/apache/hadoop/fs/viewfs/ViewFs.java | 10 +-
.../TestViewFileSystemLocalFileSystem.java | 77 +-
.../hadoop/fs/viewfs/TestViewFsConfig.java | 13 +-
.../fs/viewfs/TestViewFileSystemHdfs.java | 151 ++-
9 files changed, 1275 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
index 6900df2..a5fc62e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.viewfs;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
/**
* Utilities for config variables of the viewFs See {@link ViewFs}
@@ -69,6 +70,32 @@ public class ConfigUtil {
}
/**
+ *
+ * @param conf
+ * @param mountTableName
+ * @param src
+ * @param settings
+ * @param targets
+ */
+ public static void addLinkNfly(Configuration conf, String mountTableName,
+ String src, String settings, final URI ... targets) {
+
+ settings = settings == null
+ ? "minReplication=2,repairOnRead=true"
+ : settings;
+
+ conf.set(getConfigViewFsPrefix(mountTableName) + "." +
+ Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
+ StringUtils.uriToString(targets));
+ }
+
+ public static void addLinkNfly(final Configuration conf, final String src,
+ final URI ... targets) {
+ addLinkNfly(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, src, null,
+ targets);
+ }
+
+ /**
* Add config variable for homedir for default mount table
* @param conf - add to this conf
* @param homedir - the home dir path starting with slash
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
index 9882a8e..1a07c10 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
@@ -57,7 +57,13 @@ public interface Constants {
* Config variable for specifying a merge link
*/
public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
-
+
+ /**
+ * Config variable for specifying an nfly link. Nfly writes to multiple
+ * locations, and allows reads from the closest one.
+ */
+ String CONFIG_VIEWFS_LINK_NFLY = "linkNfly";
+
/**
* Config variable for specifying a merge of the root of the mount-table
* with the root of another file system.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
index c62d5cc..665c9c9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java
@@ -133,6 +133,12 @@ abstract class InodeTree<T> {
}
}
+ enum LinkType {
+ SINGLE,
+ MERGE,
+ NFLY
+ }
+
/**
* An internal class to represent a mount link.
* A mount link can be single dir link or a merge dir link.
@@ -146,19 +152,17 @@ abstract class InodeTree<T> {
* is changed later it is then ignored (a dir with null entries)
*/
static class INodeLink<T> extends INode<T> {
- final boolean isMergeLink; // true if MergeLink
final URI[] targetDirLinkList;
final T targetFileSystem; // file system object created from the link.
/**
- * Construct a mergeLink.
+ * Construct a mergeLink or nfly.
*/
INodeLink(final String pathToNode, final UserGroupInformation aUgi,
final T targetMergeFs, final URI[] aTargetDirLinkList) {
super(pathToNode, aUgi);
targetFileSystem = targetMergeFs;
targetDirLinkList = aTargetDirLinkList;
- isMergeLink = true;
}
/**
@@ -170,7 +174,6 @@ abstract class InodeTree<T> {
targetFileSystem = targetFs;
targetDirLinkList = new URI[1];
targetDirLinkList[0] = aTargetDirLink;
- isMergeLink = false;
}
/**
@@ -188,7 +191,9 @@ abstract class InodeTree<T> {
}
private void createLink(final String src, final String target,
- final boolean isLinkMerge, final UserGroupInformation aUgi)
+ final LinkType linkType, final String settings,
+ final UserGroupInformation aUgi,
+ final Configuration config)
throws URISyntaxException, IOException,
FileAlreadyExistsException, UnsupportedFileSystemException {
// Validate that src is valid absolute path
@@ -235,18 +240,20 @@ abstract class InodeTree<T> {
final INodeLink<T> newLink;
final String fullPath = curInode.fullPath + (curInode == root ? "" : "/")
+ iPath;
- if (isLinkMerge) { // Target is list of URIs
- String[] targetsList = StringUtils.getStrings(target);
- URI[] targetsListURI = new URI[targetsList.length];
- int k = 0;
- for (String itarget : targetsList) {
- targetsListURI[k++] = new URI(itarget);
- }
- newLink = new INodeLink<T>(fullPath, aUgi,
- getTargetFileSystem(targetsListURI), targetsListURI);
- } else {
+ switch (linkType) {
+ case SINGLE:
newLink = new INodeLink<T>(fullPath, aUgi,
getTargetFileSystem(new URI(target)), new URI(target));
+ break;
+ case MERGE:
+ case NFLY:
+ final URI[] targetUris = StringUtils.stringToURI(
+ StringUtils.getStrings(target));
+ newLink = new INodeLink<T>(fullPath, aUgi,
+ getTargetFileSystem(settings, targetUris), targetUris);
+ break;
+ default:
+ throw new IllegalArgumentException(linkType + ": Infeasible linkType");
}
curInode.addLink(iPath, newLink);
mountPoints.add(new MountPoint<T>(src, newLink));
@@ -257,14 +264,14 @@ abstract class InodeTree<T> {
* 3 abstract methods.
* @throws IOException
*/
- protected abstract T getTargetFileSystem(final URI uri)
+ protected abstract T getTargetFileSystem(URI uri)
throws UnsupportedFileSystemException, URISyntaxException, IOException;
- protected abstract T getTargetFileSystem(final INodeDir<T> dir)
+ protected abstract T getTargetFileSystem(INodeDir<T> dir)
throws URISyntaxException;
- protected abstract T getTargetFileSystem(final URI[] mergeFsURIList)
- throws UnsupportedFileSystemException, URISyntaxException;
+ protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
+ throws UnsupportedFileSystemException, URISyntaxException, IOException;
/**
* Create Inode Tree from the specified mount-table specified in Config
@@ -298,8 +305,9 @@ abstract class InodeTree<T> {
final String key = si.getKey();
if (key.startsWith(mtPrefix)) {
gotMountTableEntry = true;
- boolean isMergeLink = false;
+ LinkType linkType = LinkType.SINGLE;
String src = key.substring(mtPrefix.length());
+ String settings = null;
if (src.startsWith(linkPrefix)) {
src = src.substring(linkPrefix.length());
if (src.equals(SlashPath.toString())) {
@@ -309,8 +317,20 @@ abstract class InodeTree<T> {
+ "supported yet.");
}
} else if (src.startsWith(linkMergePrefix)) { // A merge link
- isMergeLink = true;
+ linkType = LinkType.MERGE;
src = src.substring(linkMergePrefix.length());
+ } else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
+ // prefix.settings.src
+ src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
+ // settings.src
+ settings = src.substring(0, src.indexOf('.'));
+ // settings
+
+ // settings.src
+ src = src.substring(settings.length() + 1);
+ // src
+
+ linkType = LinkType.NFLY;
} else if (src.startsWith(Constants.CONFIG_VIEWFS_HOMEDIR)) {
// ignore - we set home dir from config
continue;
@@ -319,7 +339,7 @@ abstract class InodeTree<T> {
"Mount table in config: " + src);
}
final String target = si.getValue(); // link or merge link
- createLink(src, target, isMergeLink, ugi);
+ createLink(src, target, linkType, settings, ugi, config);
}
}
if (!gotMountTableEntry) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
new file mode 100644
index 0000000..53966b8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java
@@ -0,0 +1,951 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Nfly is a multi filesystem mount point.
+ */
+@Private
+final class NflyFSystem extends FileSystem {
+ private static final Log LOG = LogFactory.getLog(NflyFSystem.class);
+ private static final String NFLY_TMP_PREFIX = "_nfly_tmp_";
+
+ enum NflyKey {
+ // minimum replication, if local filesystem is included +1 is recommended
+ minReplication,
+
+ // forces to check all the replicas and fetch the one with the most recent
+ // time stamp
+ //
+ readMostRecent,
+
+ // create missing replica from far to near, including local?
+ repairOnRead
+ }
+
+ private static final int DEFAULT_MIN_REPLICATION = 2;
+ private static URI nflyURI = URI.create("nfly:///");
+
+ private final NflyNode[] nodes;
+ private final int minReplication;
+ private final EnumSet<NflyKey> nflyFlags;
+ private final Node myNode;
+ private final NetworkTopology topology;
+
+ /**
+ * URI's authority is used as an approximation of the distance from the
+ * client. It's sufficient for DC but not accurate because worker nodes can be
+ * closer.
+ */
+ private static class NflyNode extends NodeBase {
+ private final ChRootedFileSystem fs;
+ NflyNode(String hostName, String rackName, URI uri,
+ Configuration conf) throws IOException {
+ this(hostName, rackName, new ChRootedFileSystem(uri, conf));
+ }
+
+ NflyNode(String hostName, String rackName, ChRootedFileSystem fs) {
+ super(hostName, rackName);
+ this.fs = fs;
+ }
+
+ ChRootedFileSystem getFs() {
+ return fs;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ // satisfy findbugs
+ return super.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ // satisfy findbugs
+ return super.hashCode();
+ }
+
+ }
+
+ private static final class MRNflyNode
+ extends NflyNode implements Comparable<MRNflyNode> {
+
+ private FileStatus status;
+
+ private MRNflyNode(NflyNode n) {
+ super(n.getName(), n.getNetworkLocation(), n.fs);
+ }
+
+ private void updateFileStatus(Path f) throws IOException {
+ final FileStatus tmpStatus = getFs().getFileStatus(f);
+ status = tmpStatus == null
+ ? notFoundStatus(f)
+ : tmpStatus;
+ }
+
+ // TODO allow configurable error margin for FileSystems with different
+ // timestamp precisions
+ @Override
+ public int compareTo(MRNflyNode other) {
+ if (status == null) {
+ return other.status == null ? 0 : 1; // move non-null towards head
+ } else if (other.status == null) {
+ return -1; // move this towards head
+ } else {
+ final long mtime = status.getModificationTime();
+ final long their = other.status.getModificationTime();
+ return Long.compare(their, mtime); // move more recent towards head
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof MRNflyNode)) {
+ return false;
+ }
+ MRNflyNode other = (MRNflyNode) o;
+ return 0 == compareTo(other);
+ }
+
+ @Override
+ public int hashCode() {
+ // satisfy findbugs
+ return super.hashCode();
+ }
+
+ private FileStatus nflyStatus() throws IOException {
+ return new NflyStatus(getFs(), status);
+ }
+
+ private FileStatus cloneStatus() throws IOException {
+ return new FileStatus(status.getLen(),
+ status.isDirectory(),
+ status.getReplication(),
+ status.getBlockSize(),
+ status.getModificationTime(),
+ status.getAccessTime(),
+ null, null, null,
+ status.isSymlink() ? status.getSymlink() : null,
+ status.getPath());
+ }
+ }
+
+ private MRNflyNode[] workSet() {
+ final MRNflyNode[] res = new MRNflyNode[nodes.length];
+ for (int i = 0; i < res.length; i++) {
+ res[i] = new MRNflyNode(nodes[i]);
+ }
+ return res;
+ }
+
+
+ /**
+ * Utility to replace null with DEFAULT_RACK.
+ *
+ * @param rackString rack value, can be null
+ * @return non-null rack string
+ */
+ private static String getRack(String rackString) {
+ return rackString == null ? NetworkTopology.DEFAULT_RACK : rackString;
+ }
+
+ /**
+ * Creates a new Nfly instance.
+ *
+ * @param uris the list of uris in the mount point
+ * @param conf configuration object
+ * @param minReplication minimum copies to commit a write op
+ * @param nflyFlags modes such readMostRecent
+ * @throws IOException
+ */
+ private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
+ EnumSet<NflyKey> nflyFlags) throws IOException {
+ if (uris.length < minReplication) {
+ throw new IOException(minReplication + " < " + uris.length
+ + ": Minimum replication < #destinations");
+ }
+ setConf(conf);
+ final String localHostName = InetAddress.getLocalHost().getHostName();
+
+ // build a list for topology resolution
+ final List<String> hostStrings = new ArrayList<String>(uris.length + 1);
+ for (URI uri : uris) {
+ final String uriHost = uri.getHost();
+ // assume local file system or another closest filesystem if no authority
+ hostStrings.add(uriHost == null ? localHostName : uriHost);
+ }
+ // resolve the client node
+ hostStrings.add(localHostName);
+
+ final DNSToSwitchMapping tmpDns = ReflectionUtils.newInstance(conf.getClass(
+ CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
+
+ // this is an ArrayList
+ final List<String> rackStrings = tmpDns.resolve(hostStrings);
+ nodes = new NflyNode[uris.length];
+ final Iterator<String> rackIter = rackStrings.iterator();
+ for (int i = 0; i < nodes.length; i++) {
+ nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
+ conf);
+ }
+ // sort all the uri's by distance from myNode, the local file system will
+ // automatically be the the first one.
+ //
+ myNode = new NodeBase(localHostName, getRack(rackIter.next()));
+ topology = NetworkTopology.getInstance(conf);
+ topology.sortByDistance(myNode, nodes, nodes.length);
+
+ this.minReplication = minReplication;
+ this.nflyFlags = nflyFlags;
+ statistics = getStatistics(nflyURI.getScheme(), getClass());
+ }
+
+ /**
+ * Transactional output stream. When creating path /dir/file
+ * 1) create invisible /real/dir_i/_nfly_tmp_file
+ * 2) when more than min replication was written, write is committed by
+ * renaming all successfully written files to /real/dir_i/file
+ */
+ private final class NflyOutputStream extends OutputStream {
+ // actual path
+ private final Path nflyPath;
+ // tmp path before commit
+ private final Path tmpPath;
+ // broadcast set
+ private final FSDataOutputStream[] outputStreams;
+ // status set: 1 working, 0 problem
+ private final BitSet opSet;
+ private final boolean useOverwrite;
+
+ private NflyOutputStream(Path f, FsPermission permission, boolean overwrite,
+ int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ nflyPath = f;
+ tmpPath = getNflyTmpPath(f);
+ outputStreams = new FSDataOutputStream[nodes.length];
+ for (int i = 0; i < outputStreams.length; i++) {
+ outputStreams[i] = nodes[i].fs.create(tmpPath, permission, true,
+ bufferSize, replication, blockSize, progress);
+ }
+ opSet = new BitSet(outputStreams.length);
+ opSet.set(0, outputStreams.length);
+ useOverwrite = false;
+ }
+
+ //
+ // TODO consider how to clean up and throw an exception early when the clear
+ // bits under min replication
+ //
+
+ private void mayThrow(List<IOException> ioExceptions) throws IOException {
+ final IOException ioe = MultipleIOException
+ .createIOException(ioExceptions);
+ if (opSet.cardinality() < minReplication) {
+ throw ioe;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exceptions occurred: " + ioe);
+ }
+ }
+ }
+
+
+ @Override
+ public void write(int d) throws IOException {
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
+ for (int i = opSet.nextSetBit(0);
+ i >=0;
+ i = opSet.nextSetBit(i + 1)) {
+ try {
+ outputStreams[i].write(d);
+ } catch (Throwable t) {
+ osException(i, "write", t, ioExceptions);
+ }
+ }
+ mayThrow(ioExceptions);
+ }
+
+ private void osException(int i, String op, Throwable t,
+ List<IOException> ioExceptions) {
+ opSet.clear(i);
+ processThrowable(nodes[i], op, t, ioExceptions, tmpPath, nflyPath);
+ }
+
+ @Override
+ public void write(byte[] bytes, int offset, int len) throws IOException {
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
+ for (int i = opSet.nextSetBit(0);
+ i >= 0;
+ i = opSet.nextSetBit(i + 1)) {
+ try {
+ outputStreams[i].write(bytes, offset, len);
+ } catch (Throwable t) {
+ osException(i, "write", t, ioExceptions);
+ }
+ }
+ mayThrow(ioExceptions);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
+ for (int i = opSet.nextSetBit(0);
+ i >= 0;
+ i = opSet.nextSetBit(i + 1)) {
+ try {
+ outputStreams[i].flush();
+ } catch (Throwable t) {
+ osException(i, "flush", t, ioExceptions);
+ }
+ }
+ mayThrow(ioExceptions);
+ }
+
+ @Override
+ public void close() throws IOException {
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
+ for (int i = opSet.nextSetBit(0);
+ i >= 0;
+ i = opSet.nextSetBit(i + 1)) {
+ try {
+ outputStreams[i].close();
+ } catch (Throwable t) {
+ osException(i, "close", t, ioExceptions);
+ }
+ }
+ if (opSet.cardinality() < minReplication) {
+ cleanupAllTmpFiles();
+ throw new IOException("Failed to sufficiently replicate: min="
+ + minReplication + " actual=" + opSet.cardinality());
+ } else {
+ commit();
+ }
+ }
+
+ private void cleanupAllTmpFiles() throws IOException {
+ for (int i = 0; i < outputStreams.length; i++) {
+ try {
+ nodes[i].fs.delete(tmpPath);
+ } catch (Throwable t) {
+ processThrowable(nodes[i], "delete", t, null, tmpPath);
+ }
+ }
+ }
+
+ private void commit() throws IOException {
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
+ for (int i = opSet.nextSetBit(0);
+ i >= 0;
+ i = opSet.nextSetBit(i + 1)) {
+ final NflyNode nflyNode = nodes[i];
+ try {
+ if (useOverwrite) {
+ nflyNode.fs.delete(nflyPath);
+ }
+ nflyNode.fs.rename(tmpPath, nflyPath);
+
+ } catch (Throwable t) {
+ osException(i, "commit", t, ioExceptions);
+ }
+ }
+
+ if (opSet.cardinality() < minReplication) {
+ // cleanup should be done outside. If rename failed, it's unlikely that
+ // delete will work either. It's the same kind of metadata-only op
+ //
+ throw MultipleIOException.createIOException(ioExceptions);
+ }
+
+ // best effort to have a consistent timestamp
+ final long commitTime = System.currentTimeMillis();
+ for (int i = opSet.nextSetBit(0);
+ i >= 0;
+ i = opSet.nextSetBit(i + 1)) {
+ try {
+ nodes[i].fs.setTimes(nflyPath, commitTime, commitTime);
+ } catch (Throwable t) {
+ LOG.info("Failed to set timestamp: " + nodes[i] + " " + nflyPath);
+ }
+ }
+ }
+ }
+
+ private Path getNflyTmpPath(Path f) {
+ return new Path(f.getParent(), NFLY_TMP_PREFIX + f.getName());
+ }
+
+ /**
+ * // TODO
+ * Some file status implementations have expensive deserialization or metadata
+ * retrieval. This probably does not go beyond RawLocalFileSystem. Wrapping
+ * the the real file status to preserve this behavior. Otherwise, calling
+ * realStatus getters in constructor defeats this design.
+ */
+ static final class NflyStatus extends FileStatus {
+ private static final long serialVersionUID = 0x21f276d8;
+
+ private final FileStatus realStatus;
+ private final String strippedRoot;
+
+ private NflyStatus(ChRootedFileSystem realFs, FileStatus realStatus)
+ throws IOException {
+ this.realStatus = realStatus;
+ this.strippedRoot = realFs.stripOutRoot(realStatus.getPath());
+ }
+
+ String stripRoot() throws IOException {
+ return strippedRoot;
+ }
+
+ @Override
+ public long getLen() {
+ return realStatus.getLen();
+ }
+
+ @Override
+ public boolean isFile() {
+ return realStatus.isFile();
+ }
+
+ @Override
+ public boolean isDirectory() {
+ return realStatus.isDirectory();
+ }
+
+ @Override
+ public boolean isSymlink() {
+ return realStatus.isSymlink();
+ }
+
+ @Override
+ public long getBlockSize() {
+ return realStatus.getBlockSize();
+ }
+
+ @Override
+ public short getReplication() {
+ return realStatus.getReplication();
+ }
+
+ @Override
+ public long getModificationTime() {
+ return realStatus.getModificationTime();
+ }
+
+ @Override
+ public long getAccessTime() {
+ return realStatus.getAccessTime();
+ }
+
+ @Override
+ public FsPermission getPermission() {
+ return realStatus.getPermission();
+ }
+
+ @Override
+ public String getOwner() {
+ return realStatus.getOwner();
+ }
+
+ @Override
+ public String getGroup() {
+ return realStatus.getGroup();
+ }
+
+ @Override
+ public Path getPath() {
+ return realStatus.getPath();
+ }
+
+ @Override
+ public void setPath(Path p) {
+ realStatus.setPath(p);
+ }
+
+ @Override
+ public Path getSymlink() throws IOException {
+ return realStatus.getSymlink();
+ }
+
+ @Override
+ public void setSymlink(Path p) {
+ realStatus.setSymlink(p);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return realStatus.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return realStatus.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return realStatus.toString();
+ }
+ }
+
+ @Override
+ public URI getUri() {
+ return nflyURI;
+ }
+
+ /**
+ * Category: READ.
+ *
+ * @param f the file name to open
+ * @param bufferSize the size of the buffer to be used.
+ * @return input stream according to nfly flags (closest, most recent)
+ * @throws IOException
+ * @throws FileNotFoundException iff all destinations generate this exception
+ */
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ // TODO proxy stream for reads
+ final List<IOException> ioExceptions =
+ new ArrayList<IOException>(nodes.length);
+ int numNotFounds = 0;
+ final MRNflyNode[] mrNodes = workSet();
+
+ // naively iterate until one can be opened
+ //
+ for (final MRNflyNode nflyNode : mrNodes) {
+ try {
+ if (nflyFlags.contains(NflyKey.repairOnRead)
+ || nflyFlags.contains(NflyKey.readMostRecent)) {
+ // calling file status to avoid pulling bytes prematurely
+ nflyNode.updateFileStatus(f);
+ } else {
+ return nflyNode.getFs().open(f, bufferSize);
+ }
+ } catch (FileNotFoundException fnfe) {
+ nflyNode.status = notFoundStatus(f);
+ numNotFounds++;
+ processThrowable(nflyNode, "open", fnfe, ioExceptions, f);
+ } catch (Throwable t) {
+ processThrowable(nflyNode, "open", t, ioExceptions, f);
+ }
+ }
+
+ if (nflyFlags.contains(NflyKey.readMostRecent)) {
+ // sort from most recent to least recent
+ Arrays.sort(mrNodes);
+ }
+
+ final FSDataInputStream fsdisAfterRepair = repairAndOpen(mrNodes, f,
+ bufferSize);
+
+ if (fsdisAfterRepair != null) {
+ return fsdisAfterRepair;
+ }
+
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
+ throw MultipleIOException.createIOException(ioExceptions);
+ }
+
+ private static FileStatus notFoundStatus(Path f) {
+ return new FileStatus(-1, false, 0, 0, 0, f);
+ }
+
+ /**
+ * Iterate all available nodes in the proximity order to attempt repair of all
+ * FileNotFound nodes.
+ *
+ * @param mrNodes work set copy of nodes
+ * @param f path to repair and open
+ * @param bufferSize buffer size for read RPC
+ * @return the closest/most recent replica stream AFTER repair
+ */
+ private FSDataInputStream repairAndOpen(MRNflyNode[] mrNodes, Path f,
+ int bufferSize) {
+ long maxMtime = 0L;
+ for (final MRNflyNode srcNode : mrNodes) {
+ if (srcNode.status == null // not available
+ || srcNode.status.getLen() < 0L) { // not found
+ continue; // not available
+ }
+ if (srcNode.status.getModificationTime() > maxMtime) {
+ maxMtime = srcNode.status.getModificationTime();
+ }
+
+ // attempt to repair all notFound nodes with srcNode
+ //
+ for (final MRNflyNode dstNode : mrNodes) {
+ if (dstNode.status == null // not available
+ || srcNode.compareTo(dstNode) == 0) { // same mtime
+ continue;
+ }
+
+ try {
+ // status is absolute from the underlying mount, making it chrooted
+ //
+ final FileStatus srcStatus = srcNode.cloneStatus();
+ srcStatus.setPath(f);
+ final Path tmpPath = getNflyTmpPath(f);
+ FileUtil.copy(srcNode.getFs(), srcStatus, dstNode.getFs(), tmpPath,
+ false, // don't delete
+ true, // overwrite
+ getConf());
+ dstNode.getFs().delete(f, false);
+ if (dstNode.getFs().rename(tmpPath, f)) {
+ try {
+ dstNode.getFs().setTimes(f, srcNode.status.getModificationTime(),
+ srcNode.status.getAccessTime());
+ } finally {
+ // save getFileStatus rpc
+ srcStatus.setPath(dstNode.getFs().makeQualified(f));
+ dstNode.status = srcStatus;
+ }
+ }
+ } catch (IOException ioe) {
+ // can blame the source by statusSet.clear(ai), however, it would
+ // cost an extra RPC, so just rely on the loop below that will attempt
+ // an open anyhow
+ //
+ LOG.info(f + " " + srcNode + "->" + dstNode + ": Failed to repair",
+ ioe);
+ }
+ }
+ }
+
+ // Since Java7, QuickSort is used instead of MergeSort.
+ // QuickSort may not be stable and thus the equal most recent nodes, may no
+ // longer appear in the NetworkTopology order.
+ //
+ if (maxMtime > 0) {
+ final List<MRNflyNode> mrList = new ArrayList<MRNflyNode>();
+ for (final MRNflyNode openNode : mrNodes) {
+ if (openNode.status != null && openNode.status.getLen() >= 0L) {
+ if (openNode.status.getModificationTime() == maxMtime) {
+ mrList.add(openNode);
+ }
+ }
+ }
+ // assert mrList.size > 0
+ final MRNflyNode[] readNodes = mrList.toArray(new MRNflyNode[0]);
+ topology.sortByDistance(myNode, readNodes, readNodes.length);
+ for (final MRNflyNode rNode : readNodes) {
+ try {
+ return rNode.getFs().open(f, bufferSize);
+ } catch (IOException e) {
+ LOG.info(f + ": Failed to open at " + rNode.getFs().getUri());
+ }
+ }
+ }
+ return null;
+ }
+
+ private void mayThrowFileNotFound(List<IOException> ioExceptions,
+ int numNotFounds) throws FileNotFoundException {
+ if (numNotFounds == nodes.length) {
+ throw (FileNotFoundException)ioExceptions.get(nodes.length - 1);
+ }
+ }
+
+ // WRITE
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return new FSDataOutputStream(new NflyOutputStream(f, permission, overwrite,
+ bufferSize, replication, blockSize, progress), statistics);
+ }
+
+ // WRITE
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress) throws IOException {
+ return null;
+ }
+
+ // WRITE
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
+ int numNotFounds = 0;
+ boolean succ = true;
+ for (final NflyNode nflyNode : nodes) {
+ try {
+ succ &= nflyNode.fs.rename(src, dst);
+ } catch (FileNotFoundException fnfe) {
+ numNotFounds++;
+ processThrowable(nflyNode, "rename", fnfe, ioExceptions, src, dst);
+ } catch (Throwable t) {
+ processThrowable(nflyNode, "rename", t, ioExceptions, src, dst);
+ succ = false;
+ }
+ }
+
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
+
+ // if all destinations threw exceptions throw, otherwise return
+ //
+ if (ioExceptions.size() == nodes.length) {
+ throw MultipleIOException.createIOException(ioExceptions);
+ }
+
+ return succ;
+ }
+
+ // WRITE
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ final List<IOException> ioExceptions = new ArrayList<IOException>();
+ int numNotFounds = 0;
+ boolean succ = true;
+ for (final NflyNode nflyNode : nodes) {
+ try {
+ succ &= nflyNode.fs.delete(f);
+ } catch (FileNotFoundException fnfe) {
+ numNotFounds++;
+ processThrowable(nflyNode, "delete", fnfe, ioExceptions, f);
+ } catch (Throwable t) {
+ processThrowable(nflyNode, "delete", t, ioExceptions, f);
+ succ = false;
+ }
+ }
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
+
+ // if all destinations threw exceptions throw, otherwise return
+ //
+ if (ioExceptions.size() == nodes.length) {
+ throw MultipleIOException.createIOException(ioExceptions);
+ }
+
+ return succ;
+ }
+
+
+ /**
+ * Returns the closest non-failing destination's result.
+ *
+ * @param f given path
+ * @return array of file statuses according to nfly modes
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+ IOException {
+ final List<IOException> ioExceptions =
+ new ArrayList<IOException>(nodes.length);
+
+ final MRNflyNode[] mrNodes = workSet();
+ if (nflyFlags.contains(NflyKey.readMostRecent)) {
+ int numNotFounds = 0;
+ for (final MRNflyNode nflyNode : mrNodes) {
+ try {
+ nflyNode.updateFileStatus(f);
+ } catch (FileNotFoundException fnfe) {
+ numNotFounds++;
+ processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
+ } catch (Throwable t) {
+ processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
+ }
+ }
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
+ Arrays.sort(mrNodes);
+ }
+
+ int numNotFounds = 0;
+ for (final MRNflyNode nflyNode : mrNodes) {
+ try {
+ final FileStatus[] realStats = nflyNode.getFs().listStatus(f);
+ final FileStatus[] nflyStats = new FileStatus[realStats.length];
+ for (int i = 0; i < realStats.length; i++) {
+ nflyStats[i] = new NflyStatus(nflyNode.getFs(), realStats[i]);
+ }
+ return nflyStats;
+ } catch (FileNotFoundException fnfe) {
+ numNotFounds++;
+ processThrowable(nflyNode, "listStatus", fnfe, ioExceptions, f);
+ } catch (Throwable t) {
+ processThrowable(nflyNode, "listStatus", t, ioExceptions, f);
+ }
+ }
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
+ throw MultipleIOException.createIOException(ioExceptions);
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
+ throws FileNotFoundException, IOException {
+ // TODO important for splits
+ return super.listLocatedStatus(f);
+ }
+
+ @Override
+ public void setWorkingDirectory(Path newDir) {
+ for (final NflyNode nflyNode : nodes) {
+ nflyNode.fs.setWorkingDirectory(newDir);
+ }
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return nodes[0].fs.getWorkingDirectory(); // 0 is as good as any
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ boolean succ = true;
+ for (final NflyNode nflyNode : nodes) {
+ succ &= nflyNode.fs.mkdirs(f, permission);
+ }
+ return succ;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ // TODO proxy stream for reads
+ final List<IOException> ioExceptions =
+ new ArrayList<IOException>(nodes.length);
+ int numNotFounds = 0;
+ final MRNflyNode[] mrNodes = workSet();
+
+ long maxMtime = Long.MIN_VALUE;
+ int maxMtimeIdx = Integer.MIN_VALUE;
+
+ // naively iterate until one can be returned
+ //
+ for (int i = 0; i < mrNodes.length; i++) {
+ MRNflyNode nflyNode = mrNodes[i];
+ try {
+ nflyNode.updateFileStatus(f);
+ if (nflyFlags.contains(NflyKey.readMostRecent)) {
+ final long nflyTime = nflyNode.status.getModificationTime();
+ if (nflyTime > maxMtime) {
+ maxMtime = nflyTime;
+ maxMtimeIdx = i;
+ }
+ } else {
+ return nflyNode.nflyStatus();
+ }
+ } catch (FileNotFoundException fnfe) {
+ numNotFounds++;
+ processThrowable(nflyNode, "getFileStatus", fnfe, ioExceptions, f);
+ } catch (Throwable t) {
+ processThrowable(nflyNode, "getFileStatus", t, ioExceptions, f);
+ }
+ }
+
+ if (maxMtimeIdx >= 0) {
+ return mrNodes[maxMtimeIdx].nflyStatus();
+ }
+
+ mayThrowFileNotFound(ioExceptions, numNotFounds);
+ throw MultipleIOException.createIOException(ioExceptions);
+ }
+
+ private static void processThrowable(NflyNode nflyNode, String op,
+ Throwable t, List<IOException> ioExceptions,
+ Path... f) {
+ final String errMsg = Arrays.toString(f)
+ + ": failed to " + op + " " + nflyNode.fs.getUri();
+ final IOException ioex;
+ if (t instanceof FileNotFoundException) {
+ ioex = new FileNotFoundException(errMsg);
+ ioex.initCause(t);
+ } else {
+ ioex = new IOException(errMsg, t);
+ }
+
+ if (ioExceptions != null) {
+ ioExceptions.add(ioex);
+ }
+ }
+
+ /**
+ * Initializes an nfly mountpoint in viewfs.
+ *
+ * @param uris destinations to replicate writes to
+ * @param conf file system configuration
+ * @param settings comma-separated list of k=v pairs.
+ * @return an Nfly filesystem
+ * @throws IOException
+ */
+ static FileSystem createFileSystem(URI[] uris, Configuration conf,
+ String settings) throws IOException {
+ // assert settings != null
+ int minRepl = DEFAULT_MIN_REPLICATION;
+ EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
+ final String[] kvPairs = StringUtils.split(settings);
+ for (String kv : kvPairs) {
+ final String[] kvPair = StringUtils.split(kv, '=');
+ if (kvPair.length != 2) {
+ throw new IllegalArgumentException(kv);
+ }
+ NflyKey nflyKey = NflyKey.valueOf(kvPair[0]);
+ switch (nflyKey) {
+ case minReplication:
+ minRepl = Integer.parseInt(kvPair[1]);
+ break;
+ case repairOnRead:
+ case readMostRecent:
+ if (Boolean.valueOf(kvPair[1])) {
+ nflyFlags.add(nflyKey);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(nflyKey + ": Infeasible");
+ }
+ }
+ return new NflyFSystem(uris, conf, minRepl, nflyFlags);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
index 158b099..ca1380a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
@@ -186,25 +185,21 @@ public class ViewFileSystem extends FileSystem {
fsState = new InodeTree<FileSystem>(conf, authority) {
@Override
- protected
- FileSystem getTargetFileSystem(final URI uri)
+ protected FileSystem getTargetFileSystem(final URI uri)
throws URISyntaxException, IOException {
return new ChRootedFileSystem(uri, config);
}
@Override
- protected
- FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
+ protected FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
throws URISyntaxException {
return new InternalDirOfViewFs(dir, creationTime, ugi, myUri, config);
}
@Override
- protected
- FileSystem getTargetFileSystem(URI[] mergeFsURIList)
- throws URISyntaxException, UnsupportedFileSystemException {
- throw new UnsupportedFileSystemException("mergefs not implemented");
- // return MergeFs.createMergeFs(mergeFsURIList, config);
+ protected FileSystem getTargetFileSystem(final String settings,
+ final URI[] uris) throws URISyntaxException, IOException {
+ return NflyFSystem.createFileSystem(uris, config, settings);
}
};
workingDir = this.getHomeDirectory();
@@ -455,8 +450,13 @@ public class ViewFileSystem extends FileSystem {
private Path getChrootedPath(InodeTree.ResolveResult<FileSystem> res,
FileStatus status, Path f) throws IOException {
- final String suffix = ((ChRootedFileSystem)res.targetFileSystem)
- .stripOutRoot(status.getPath());
+ final String suffix;
+ if (res.targetFileSystem instanceof ChRootedFileSystem) {
+ suffix = ((ChRootedFileSystem)res.targetFileSystem)
+ .stripOutRoot(status.getPath());
+ } else { // nfly
+ suffix = ((NflyFSystem.NflyStatus)status).stripRoot();
+ }
return this.makeQualified(
suffix.length() == 0 ? f : new Path(res.resolvedPath, suffix));
}
@@ -501,10 +501,15 @@ public class ViewFileSystem extends FileSystem {
verifyRenameStrategy(srcUri, dstUri,
resSrc.targetFileSystem == resDst.targetFileSystem, renameStrategy);
- ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem;
- ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem;
- return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath),
- dstFS.fullPath(resDst.remainingPath));
+ if (resSrc.targetFileSystem instanceof ChRootedFileSystem &&
+ resDst.targetFileSystem instanceof ChRootedFileSystem) {
+ ChRootedFileSystem srcFS = (ChRootedFileSystem) resSrc.targetFileSystem;
+ ChRootedFileSystem dstFS = (ChRootedFileSystem) resDst.targetFileSystem;
+ return srcFS.getMyFs().rename(srcFS.fullPath(resSrc.remainingPath),
+ dstFS.fullPath(resDst.remainingPath));
+ } else {
+ return resSrc.targetFileSystem.rename(resSrc.remainingPath, resDst.remainingPath);
+ }
}
static void verifyRenameStrategy(URI srcUri, URI dstUri,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
index 364485f..6a89f27 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
@@ -212,8 +212,7 @@ public class ViewFs extends AbstractFileSystem {
fsState = new InodeTree<AbstractFileSystem>(conf, authority) {
@Override
- protected
- AbstractFileSystem getTargetFileSystem(final URI uri)
+ protected AbstractFileSystem getTargetFileSystem(final URI uri)
throws URISyntaxException, UnsupportedFileSystemException {
String pathString = uri.getPath();
if (pathString.isEmpty()) {
@@ -225,15 +224,14 @@ public class ViewFs extends AbstractFileSystem {
}
@Override
- protected
- AbstractFileSystem getTargetFileSystem(
+ protected AbstractFileSystem getTargetFileSystem(
final INodeDir<AbstractFileSystem> dir) throws URISyntaxException {
return new InternalDirOfViewFs(dir, creationTime, ugi, getUri());
}
@Override
- protected
- AbstractFileSystem getTargetFileSystem(URI[] mergeFsURIList)
+ protected AbstractFileSystem getTargetFileSystem(final String settings,
+ final URI[] mergeFsURIList)
throws URISyntaxException, UnsupportedFileSystemException {
throw new UnsupportedFileSystemException("mergefs not implemented yet");
// return MergeFs.createMergeFs(mergeFsURIList, config);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
index 4943792..808d8b0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLocalFileSystem.java
@@ -18,13 +18,25 @@
package org.apache.hadoop.fs.viewfs;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
-
+import org.junit.Test;
/**
@@ -37,6 +49,8 @@ import org.junit.Before;
*/
public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest {
+ private static final Log LOG =
+ LogFactory.getLog(TestViewFileSystemLocalFileSystem.class);
@Override
@Before
@@ -47,6 +61,65 @@ public class TestViewFileSystemLocalFileSystem extends ViewFileSystemBaseTest {
}
+ @Test
+ public void testNflyWriteSimple() throws IOException {
+ LOG.info("Starting testNflyWriteSimple");
+ final URI[] testUris = new URI[] {
+ URI.create(targetTestRoot + "/nfwd1"),
+ URI.create(targetTestRoot + "/nfwd2")
+ };
+ final String testFileName = "test.txt";
+ final Configuration testConf = new Configuration(conf);
+ final String testString = "Hello Nfly!";
+ final Path nflyRoot = new Path("/nflyroot");
+ ConfigUtil.addLinkNfly(testConf, nflyRoot.toString(), testUris);
+ final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
+
+ final FSDataOutputStream fsDos = nfly.create(
+ new Path(nflyRoot, "test.txt"));
+ try {
+ fsDos.writeUTF(testString);
+ } finally {
+ fsDos.close();
+ }
+
+ FileStatus[] statuses = nfly.listStatus(nflyRoot);
+
+ FileSystem lfs = FileSystem.getLocal(testConf);
+ for (final URI testUri : testUris) {
+ final Path testFile = new Path(new Path(testUri), testFileName);
+ assertTrue(testFile + " should exist!", lfs.exists(testFile));
+ final FSDataInputStream fsdis = lfs.open(testFile);
+ try {
+ assertEquals("Wrong file content", testString, fsdis.readUTF());
+ } finally {
+ fsdis.close();
+ }
+ }
+ }
+
+
+ @Test
+ public void testNflyInvalidMinReplication() throws Exception {
+ LOG.info("Starting testNflyInvalidMinReplication");
+ final URI[] testUris = new URI[] {
+ URI.create(targetTestRoot + "/nfwd1"),
+ URI.create(targetTestRoot + "/nfwd2")
+ };
+
+ final Configuration conf = new Configuration();
+ ConfigUtil.addLinkNfly(conf, "mt", "/nflyroot", "minReplication=4",
+ testUris);
+ try {
+ FileSystem.get(URI.create("viewfs://mt/"), conf);
+ fail("Expected bad minReplication exception.");
+ } catch (IOException ioe) {
+ assertTrue("No minReplication message",
+ ioe.getMessage().contains("Minimum replication"));
+ }
+ }
+
+
@Override
@After
public void tearDown() throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
index 895ae0c..136837f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsConfig.java
@@ -24,7 +24,6 @@ import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.junit.Test;
public class TestViewFsConfig {
@@ -43,23 +42,21 @@ public class TestViewFsConfig {
new InodeTree<Foo>(conf, null) {
@Override
- protected Foo getTargetFileSystem(final URI uri)
- throws URISyntaxException, UnsupportedFileSystemException {
+ protected Foo getTargetFileSystem(final URI uri) {
return null;
}
@Override
- protected Foo getTargetFileSystem(
- org.apache.hadoop.fs.viewfs.InodeTree.INodeDir<Foo> dir)
- throws URISyntaxException {
+ protected Foo getTargetFileSystem(final INodeDir<Foo> dir) {
return null;
}
@Override
- protected Foo getTargetFileSystem(URI[] mergeFsURIList)
- throws URISyntaxException, UnsupportedFileSystemException {
+ protected Foo getTargetFileSystem(final String settings,
+ final URI[] mergeFsURIList) {
return null;
}
+
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f3bc63e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
index b8f5379..b8bed1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemHdfs.java
@@ -17,11 +17,9 @@
*/
package org.apache.hadoop.fs.viewfs;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
-
import java.io.File;
import java.io.IOException;
+import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
@@ -31,6 +29,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -45,17 +45,26 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.client.CreateEncryptionZoneFlag;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
+
import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestViewFileSystemHdfs.class);
+
private static MiniDFSCluster cluster;
private static Path defaultWorkingDirectory;
@@ -190,12 +199,12 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
//Verify file deletion within EZ
DFSTestUtil.verifyDelete(shell, fsTarget, encFile, true);
- Assert.assertTrue("ViewFileSystem trash roots should include EZ file trash",
+ assertTrue("ViewFileSystem trash roots should include EZ file trash",
(fsView.getTrashRoots(true).size() == 1));
//Verify deletion of EZ
DFSTestUtil.verifyDelete(shell, fsTarget, zone, true);
- Assert.assertTrue("ViewFileSystem trash roots should include EZ zone trash",
+ assertTrue("ViewFileSystem trash roots should include EZ zone trash",
(fsView.getTrashRoots(true).size() == 2));
}
@@ -239,14 +248,14 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
viewFs.getFileChecksum(mountDataFilePath);
FileChecksum fileChecksumViaTargetFs =
fsTarget.getFileChecksum(fsTargetFilePath);
- Assert.assertTrue("File checksum not matching!",
+ assertTrue("File checksum not matching!",
fileChecksumViaViewFs.equals(fileChecksumViaTargetFs));
fileChecksumViaViewFs =
viewFs.getFileChecksum(mountDataFilePath, fileLength / 2);
fileChecksumViaTargetFs =
fsTarget.getFileChecksum(fsTargetFilePath, fileLength / 2);
- Assert.assertTrue("File checksum not matching!",
+ assertTrue("File checksum not matching!",
fileChecksumViaViewFs.equals(fileChecksumViaTargetFs));
}
@@ -269,4 +278,130 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
e);
}
}
+
+ @Test
+ public void testNflyClosestRepair() throws Exception {
+ testNflyRepair(NflyFSystem.NflyKey.repairOnRead);
+ }
+
+ @Test
+ public void testNflyMostRecentRepair() throws Exception {
+ testNflyRepair(NflyFSystem.NflyKey.readMostRecent);
+ }
+
+ private void testNflyRepair(NflyFSystem.NflyKey repairKey)
+ throws Exception {
+ LOG.info("Starting testNflyWriteSimpleFailover");
+ final URI uri1 = targetTestRoot.toUri();
+ final URI uri2 = targetTestRoot2.toUri();
+ final URI[] testUris = new URI[] {
+ new URI(uri1.getScheme(), uri1.getAuthority(), "/", null, null),
+ new URI(uri2.getScheme(), uri2.getAuthority(), "/", null, null)
+ };
+
+ final Configuration testConf = new Configuration(conf);
+ testConf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+
+ final String testString = "Hello Nfly!";
+ final Path nflyRoot = new Path("/nflyroot");
+
+ ConfigUtil.addLinkNfly(testConf,
+ Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
+ nflyRoot.toString(),
+ "minReplication=2," + repairKey + "=true", testUris);
+
+ final FileSystem nfly = FileSystem.get(URI.create("viewfs:///"), testConf);
+ // wd = /nflyroot/user/<user>
+ nfly.setWorkingDirectory(new Path(nflyRoot
+ + nfly.getWorkingDirectory().toUri().getPath()));
+
+ // 1. test mkdirs
+ final Path testDir = new Path("testdir1/sub1/sub3");
+ final Path testDir_tmp = new Path("testdir1/sub1/sub3_temp");
+ assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir));
+
+ // Test renames
+ assertTrue(nfly.rename(testDir, testDir_tmp));
+ assertTrue(nfly.rename(testDir_tmp, testDir));
+
+ for (final URI testUri : testUris) {
+ final FileSystem fs = FileSystem.get(testUri, testConf);
+ assertTrue(testDir + " should exist!", fs.exists(testDir));
+ }
+
+ // 2. test write
+ final Path testFile = new Path("test.txt");
+ final FSDataOutputStream fsDos = nfly.create(testFile);
+ try {
+ fsDos.writeUTF(testString);
+ } finally {
+ fsDos.close();
+ }
+
+ for (final URI testUri : testUris) {
+ final FileSystem fs = FileSystem.get(testUri, testConf);
+ final FSDataInputStream fsdis = fs.open(testFile);
+ try {
+ assertEquals("Wrong file content", testString, fsdis.readUTF());
+ } finally {
+ fsdis.close();
+ }
+ }
+
+ // 3. test reads when one unavailable
+ //
+ // bring one NN down and read through nfly should still work
+ //
+ for (int i = 0; i < cluster.getNumNameNodes(); i++) {
+ cluster.shutdownNameNode(i);
+ FSDataInputStream fsDis = null;
+ try {
+ fsDis = nfly.open(testFile);
+ assertEquals("Wrong file content", testString, fsDis.readUTF());
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDis);
+ cluster.restartNameNode(i);
+ }
+ }
+
+ // both nodes are up again, test repair
+ final FileSystem fs1 = FileSystem.get(testUris[0], conf);
+ assertTrue(fs1.delete(testFile, false));
+ assertFalse(fs1.exists(testFile));
+ FSDataInputStream fsDis = null;
+ try {
+ fsDis = nfly.open(testFile);
+ assertEquals("Wrong file content", testString, fsDis.readUTF());
+ assertTrue(fs1.exists(testFile));
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDis);
+ }
+
+ // test most recent repair
+ if (repairKey == NflyFSystem.NflyKey.readMostRecent) {
+ final FileSystem fs2 = FileSystem.get(testUris[0], conf);
+ final long expectedMtime = fs2.getFileStatus(testFile)
+ .getModificationTime();
+
+ for (final URI testUri : testUris) {
+ final FileSystem fs = FileSystem.get(testUri, conf);
+ fs.setTimes(testFile, 1L, 1L);
+ assertEquals(testUri + "Set mtime failed!", 1L,
+ fs.getFileStatus(testFile).getModificationTime());
+ assertEquals("nfly file status wrong", expectedMtime,
+ nfly.getFileStatus(testFile).getModificationTime());
+ FSDataInputStream fsDis2 = null;
+ try {
+ fsDis2 = nfly.open(testFile);
+ assertEquals("Wrong file content", testString, fsDis2.readUTF());
+ // repair is done, now trying via normal fs
+ //
+ assertEquals("Repair most recent failed!", expectedMtime,
+ fs.getFileStatus(testFile).getModificationTime());
+ } finally {
+ IOUtils.cleanupWithLogger(LOG, fsDis2);
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org