You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/07/12 00:36:38 UTC

svn commit: r1502387 - in /hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/tools/ src/test/java/org/apache/hadoop/fs/ src/test/java/org/apache/hadoop/...

Author: cmccabe
Date: Thu Jul 11 22:36:37 2013
New Revision: 1502387

URL: http://svn.apache.org/r1502387
Log:
HADOOP-9418.  Add symlink support to DistributedFileSystem (Andrew Wang via Colin Patrick McCabe)

Added:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfs.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileContext.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileSystem.java
Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
    hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1502387&r1=1502386&r2=1502387&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jul 11 22:36:37 2013
@@ -8,6 +8,9 @@ Release 2.1.1-beta - UNRELEASED
 
   IMPROVEMENTS
 
+    HADOOP-9418.  Add symlink resolution support to DistributedFileSystem.
+    (Andrew Wang via Colin Patrick McCabe)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1502387&r1=1502386&r2=1502387&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Jul 11 22:36:37 2013
@@ -1038,7 +1038,8 @@ public class DFSClient implements java.i
       return namenode.recoverLease(src, clientName);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException(FileNotFoundException.class,
-                                     AccessControlException.class);
+                                     AccessControlException.class,
+                                     UnresolvedPathException.class);
     }
   }
 
@@ -2177,7 +2178,11 @@ public class DFSClient implements java.i
    */
   public void allowSnapshot(String snapshotRoot) throws IOException {
     checkOpen();
-    namenode.allowSnapshot(snapshotRoot);
+    try {
+      namenode.allowSnapshot(snapshotRoot);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException();
+    }
   }
   
   /**
@@ -2187,7 +2192,11 @@ public class DFSClient implements java.i
    */
   public void disallowSnapshot(String snapshotRoot) throws IOException {
     checkOpen();
-    namenode.disallowSnapshot(snapshotRoot);
+    try {
+      namenode.disallowSnapshot(snapshotRoot);
+    } catch (RemoteException re) {
+      throw re.unwrapRemoteException();
+    }
   }
   
   /**

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1502387&r1=1502386&r2=1502387&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu Jul 11 22:36:37 2013
@@ -34,17 +34,24 @@ import org.apache.hadoop.fs.BlockStorage
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSLinkResolver;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
@@ -54,12 +61,12 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -146,22 +153,14 @@ public class DistributedFileSystem exten
     return dfs.getDefaultReplication();
   }
 
-  private Path makeAbsolute(Path f) {
-    if (f.isAbsolute()) {
-      return f;
-    } else {
-      return new Path(workingDir, f);
-    }
-  }
-
   @Override
   public void setWorkingDirectory(Path dir) {
-    String result = makeAbsolute(dir).toUri().getPath();
+    String result = fixRelativePart(dir).toUri().getPath();
     if (!DFSUtil.isValidName(result)) {
       throw new IllegalArgumentException("Invalid DFS directory name " + 
                                          result);
     }
-    workingDir = makeAbsolute(dir);
+    workingDir = fixRelativePart(dir);
   }
 
   
@@ -170,9 +169,18 @@ public class DistributedFileSystem exten
     return makeQualified(new Path("/user/" + dfs.ugi.getShortUserName()));
   }
 
+  /**
+   * Checks that the passed URI belongs to this filesystem, resolves the path
+   * component against the current working directory if relative, and finally
+   * returns the absolute path component.
+   * 
+   * @param file URI to check and resolve
+   * @return resolved absolute path component of {file}
+   * @throws IllegalArgumentException if URI does not belong to this DFS
+   */
   private String getPathName(Path file) {
     checkPath(file);
-    String result = makeAbsolute(file).toUri().getPath();
+    String result = file.toUri().getPath();
     if (!DFSUtil.isValidName(result)) {
       throw new IllegalArgumentException("Pathname " + result + " from " +
                                          file+" is not a valid DFS filename.");
@@ -191,10 +199,21 @@ public class DistributedFileSystem exten
   
   @Override
   public BlockLocation[] getFileBlockLocations(Path p, 
-      long start, long len) throws IOException {
+      final long start, final long len) throws IOException {
     statistics.incrementReadOps(1);
-    return dfs.getBlockLocations(getPathName(p), start, len);
-
+    final Path absF = fixRelativePart(p);
+    return new FileSystemLinkResolver<BlockLocation[]>() {
+      @Override
+      public BlockLocation[] doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.getBlockLocations(getPathName(p), start, len);
+      }
+      @Override
+      public BlockLocation[] next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.getFileBlockLocations(p, start, len);
+      }
+    }.resolve(this, absF);
   }
 
   /**
@@ -239,28 +258,68 @@ public class DistributedFileSystem exten
    * @return true if the file is already closed
    * @throws IOException if an error occurs
    */
-  public boolean recoverLease(Path f) throws IOException {
-    return dfs.recoverLease(getPathName(f));
+  public boolean recoverLease(final Path f) throws IOException {
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<Boolean>() {
+      @Override
+      public Boolean doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.recoverLease(getPathName(p));
+      }
+      @Override
+      public Boolean next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          return myDfs.recoverLease(p);
+        }
+        throw new UnsupportedOperationException("Cannot recoverLease through" +
+            " a symlink to a non-DistributedFileSystem: " + f + " -> " + p);
+      }
+    }.resolve(this, absF);
   }
 
-  @SuppressWarnings("deprecation")
   @Override
-  public HdfsDataInputStream open(Path f, int bufferSize) throws IOException {
+  public FSDataInputStream open(Path f, final int bufferSize)
+      throws IOException {
     statistics.incrementReadOps(1);
-    return new DFSClient.DFSDataInputStream(
-          dfs.open(getPathName(f), bufferSize, verifyChecksum));
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FSDataInputStream>() {
+      @Override
+      public FSDataInputStream doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return new HdfsDataInputStream(
+            dfs.open(getPathName(p), bufferSize, verifyChecksum));
+      }
+      @Override
+      public FSDataInputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.open(p, bufferSize);
+      }
+    }.resolve(this, absF);
   }
 
-  /** This optional operation is not yet supported. */
   @Override
-  public HdfsDataOutputStream append(Path f, int bufferSize,
-      Progressable progress) throws IOException {
+  public FSDataOutputStream append(Path f, final int bufferSize,
+      final Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
-    return dfs.append(getPathName(f), bufferSize, progress, statistics);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FSDataOutputStream>() {
+      @Override
+      public FSDataOutputStream doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.append(getPathName(p), bufferSize, progress, statistics);
+      }
+      @Override
+      public FSDataOutputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.append(p, bufferSize);
+      }
+    }.resolve(this, absF);
   }
 
   @Override
-  public HdfsDataOutputStream create(Path f, FsPermission permission,
+  public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
     return this.create(f, permission,
@@ -279,61 +338,125 @@ public class DistributedFileSystem exten
    * replication, to move the blocks from favored nodes. A value of null means
    * no favored nodes for this create
    */
-  public HdfsDataOutputStream create(Path f, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
-      Progressable progress, InetSocketAddress[] favoredNodes) throws IOException {
+  public HdfsDataOutputStream create(final Path f,
+      final FsPermission permission, final boolean overwrite,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress, final InetSocketAddress[] favoredNodes)
+          throws IOException {
     statistics.incrementWriteOps(1);
-    final DFSOutputStream out = dfs.create(getPathName(f), permission,
-        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-            : EnumSet.of(CreateFlag.CREATE),
-        true, replication, blockSize, progress, bufferSize, null, favoredNodes);
-    return new HdfsDataOutputStream(out, statistics);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<HdfsDataOutputStream>() {
+      @Override
+      public HdfsDataOutputStream doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        final DFSOutputStream out = dfs.create(getPathName(f), permission,
+            overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+                : EnumSet.of(CreateFlag.CREATE),
+            true, replication, blockSize, progress, bufferSize, null,
+            favoredNodes);
+        return new HdfsDataOutputStream(out, statistics);
+      }
+      @Override
+      public HdfsDataOutputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          return myDfs.create(p, permission, overwrite, bufferSize, replication,
+              blockSize, progress, favoredNodes);
+        }
+        throw new UnsupportedOperationException("Cannot create with" +
+            " favoredNodes through a symlink to a non-DistributedFileSystem: "
+            + f + " -> " + p);
+      }
+    }.resolve(this, absF);
   }
   
   @Override
-  public HdfsDataOutputStream create(Path f, FsPermission permission,
-    EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
-    Progressable progress, ChecksumOpt checksumOpt) throws IOException {
+  public FSDataOutputStream create(final Path f, final FsPermission permission,
+    final EnumSet<CreateFlag> cflags, final int bufferSize,
+    final short replication, final long blockSize, final Progressable progress,
+    final ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementWriteOps(1);
-    final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
-        replication, blockSize, progress, bufferSize, checksumOpt);
-    return new HdfsDataOutputStream(out, statistics);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FSDataOutputStream>() {
+      @Override
+      public FSDataOutputStream doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return new HdfsDataOutputStream(dfs.create(getPathName(p), permission,
+            cflags, replication, blockSize, progress, bufferSize, checksumOpt),
+            statistics);
+      }
+      @Override
+      public FSDataOutputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.create(p, permission, cflags, bufferSize,
+            replication, blockSize, progress, checksumOpt);
+      }
+    }.resolve(this, absF);
   }
-  
-  @SuppressWarnings("deprecation")
+
   @Override
   protected HdfsDataOutputStream primitiveCreate(Path f,
     FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
     short replication, long blockSize, Progressable progress,
     ChecksumOpt checksumOpt) throws IOException {
     statistics.incrementWriteOps(1);
-    return new HdfsDataOutputStream(dfs.primitiveCreate(getPathName(f),
+    return new HdfsDataOutputStream(dfs.primitiveCreate(
+        getPathName(fixRelativePart(f)),
         absolutePermission, flag, true, replication, blockSize,
         progress, bufferSize, checksumOpt),statistics);
-   } 
+   }
 
   /**
    * Same as create(), except fails if parent directory doesn't already exist.
    */
   @Override
-  public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
-      EnumSet<CreateFlag> flag, int bufferSize, short replication,
-      long blockSize, Progressable progress) throws IOException {
+  @SuppressWarnings("deprecation")
+  public FSDataOutputStream createNonRecursive(final Path f,
+      final FsPermission permission, final EnumSet<CreateFlag> flag,
+      final int bufferSize, final short replication, final long blockSize,
+      final Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
     if (flag.contains(CreateFlag.OVERWRITE)) {
       flag.add(CreateFlag.CREATE);
     }
-    return new HdfsDataOutputStream(dfs.create(getPathName(f), permission, flag,
-        false, replication, blockSize, progress, 
-        bufferSize, null), statistics);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FSDataOutputStream>() {
+      @Override
+      public FSDataOutputStream doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        return new HdfsDataOutputStream(dfs.create(getPathName(p), permission,
+            flag, false, replication, blockSize, progress, bufferSize, null),
+            statistics);
+      }
+
+      @Override
+      public FSDataOutputStream next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.createNonRecursive(p, permission, flag, bufferSize,
+            replication, blockSize, progress);
+      }
+    }.resolve(this, absF);
   }
 
   @Override
   public boolean setReplication(Path src, 
-                                short replication
+                                final short replication
                                ) throws IOException {
     statistics.incrementWriteOps(1);
-    return dfs.setReplication(getPathName(src), replication);
+    Path absF = fixRelativePart(src);
+    return new FileSystemLinkResolver<Boolean>() {
+      @Override
+      public Boolean doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.setReplication(getPathName(p), replication);
+      }
+      @Override
+      public Boolean next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.setReplication(p, replication);
+      }
+    }.resolve(this, absF);
   }
   
   /**
@@ -346,12 +469,44 @@ public class DistributedFileSystem exten
    */
   @Override
   public void concat(Path trg, Path [] psrcs) throws IOException {
-    String [] srcs = new String [psrcs.length];
-    for(int i=0; i<psrcs.length; i++) {
-      srcs[i] = getPathName(psrcs[i]);
-    }
     statistics.incrementWriteOps(1);
-    dfs.concat(getPathName(trg), srcs);
+    // Make target absolute
+    Path absF = fixRelativePart(trg);
+    // Make all srcs absolute
+    Path[] srcs = new Path[psrcs.length];
+    for (int i=0; i<psrcs.length; i++) {
+      srcs[i] = fixRelativePart(psrcs[i]);
+    }
+    // Try the concat without resolving any links
+    String[] srcsStr = new String[psrcs.length];
+    try {
+      for (int i=0; i<psrcs.length; i++) {
+        srcsStr[i] = getPathName(srcs[i]);
+      }
+      dfs.concat(getPathName(trg), srcsStr);
+    } catch (UnresolvedLinkException e) {
+      // Exception could be from trg or any src.
+      // Fully resolve trg and srcs. Fail if any of them are a symlink.
+      FileStatus stat = getFileLinkStatus(absF);
+      if (stat.isSymlink()) {
+        throw new IOException("Cannot concat with a symlink target: "
+            + trg + " -> " + stat.getPath());
+      }
+      absF = fixRelativePart(stat.getPath());
+      for (int i=0; i<psrcs.length; i++) {
+        stat = getFileLinkStatus(srcs[i]);
+        if (stat.isSymlink()) {
+          throw new IOException("Cannot concat with a symlink src: "
+              + psrcs[i] + " -> " + stat.getPath());
+        }
+        srcs[i] = fixRelativePart(stat.getPath());
+      }
+      // Try concat again. Can still race with another symlink.
+      for (int i=0; i<psrcs.length; i++) {
+        srcsStr[i] = getPathName(srcs[i]);
+      }
+      dfs.concat(getPathName(absF), srcsStr);
+    }
   }
 
   
@@ -359,7 +514,35 @@ public class DistributedFileSystem exten
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
     statistics.incrementWriteOps(1);
-    return dfs.rename(getPathName(src), getPathName(dst));
+    // Both Paths have to belong to this DFS
+    final Path absSrc = fixRelativePart(src);
+    final Path absDst = fixRelativePart(dst);
+    FileSystem srcFS = getFSofPath(absSrc, getConf());
+    FileSystem dstFS = getFSofPath(absDst, getConf());
+    if (!srcFS.getUri().equals(getUri()) ||
+        !dstFS.getUri().equals(getUri())) {
+      throw new IOException("Renames across FileSystems not supported");
+    }
+    // Try the rename without resolving first
+    try {
+      return dfs.rename(getPathName(absSrc), getPathName(absDst));
+    } catch (UnresolvedLinkException e) {
+      // Fully resolve the source
+      final Path source = getFileLinkStatus(absSrc).getPath();
+      // Keep trying to resolve the destination
+      return new FileSystemLinkResolver<Boolean>() {
+        @Override
+        public Boolean doCall(final Path p)
+            throws IOException, UnresolvedLinkException {
+          return dfs.rename(getPathName(source), getPathName(p));
+        }
+        @Override
+        public Boolean next(final FileSystem fs, final Path p)
+            throws IOException {
+          return fs.rename(source, p);
+        }
+      }.resolve(this, absDst);
+    }
   }
 
   /** 
@@ -367,62 +550,102 @@ public class DistributedFileSystem exten
    */
   @SuppressWarnings("deprecation")
   @Override
-  public void rename(Path src, Path dst, Options.Rename... options) throws IOException {
+  public void rename(Path src, Path dst, final Options.Rename... options)
+      throws IOException {
     statistics.incrementWriteOps(1);
-    dfs.rename(getPathName(src), getPathName(dst), options);
+    // Both Paths have to belong to this DFS
+    final Path absSrc = fixRelativePart(src);
+    final Path absDst = fixRelativePart(dst);
+    FileSystem srcFS = getFSofPath(absSrc, getConf());
+    FileSystem dstFS = getFSofPath(absDst, getConf());
+    if (!srcFS.getUri().equals(getUri()) ||
+        !dstFS.getUri().equals(getUri())) {
+      throw new IOException("Renames across FileSystems not supported");
+    }
+    // Try the rename without resolving first
+    try {
+      dfs.rename(getPathName(absSrc), getPathName(absDst), options);
+    } catch (UnresolvedLinkException e) {
+      // Fully resolve the source
+      final Path source = getFileLinkStatus(absSrc).getPath();
+      // Keep trying to resolve the destination
+      new FileSystemLinkResolver<Void>() {
+        @Override
+        public Void doCall(final Path p)
+            throws IOException, UnresolvedLinkException {
+          dfs.rename(getPathName(source), getPathName(p), options);
+          return null;
+        }
+        @Override
+        public Void next(final FileSystem fs, final Path p)
+            throws IOException {
+          // Since we know it's this DFS for both, can just call doCall again
+          return doCall(p);
+        }
+      }.resolve(this, absDst);
+    }
   }
   
   @Override
-  public boolean delete(Path f, boolean recursive) throws IOException {
+  public boolean delete(Path f, final boolean recursive) throws IOException {
     statistics.incrementWriteOps(1);
-    return dfs.delete(getPathName(f), recursive);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<Boolean>() {
+      @Override
+      public Boolean doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.delete(getPathName(p), recursive);
+      }
+      @Override
+      public Boolean next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.delete(p, recursive);
+      }
+    }.resolve(this, absF);
   }
   
   @Override
   public ContentSummary getContentSummary(Path f) throws IOException {
     statistics.incrementReadOps(1);
-    return dfs.getContentSummary(getPathName(f));
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<ContentSummary>() {
+      @Override
+      public ContentSummary doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.getContentSummary(getPathName(p));
+      }
+      @Override
+      public ContentSummary next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.getContentSummary(p);
+      }
+    }.resolve(this, absF);
   }
 
   /** Set a directory's quotas
    * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long) 
    */
-  public void setQuota(Path src, long namespaceQuota, long diskspaceQuota) 
-                       throws IOException {
-    dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
-  }
-  
-  private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
-    return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
-        f.getBlockSize(), f.getModificationTime(),
-        f.getAccessTime(),
-        f.getPermission(), f.getOwner(), f.getGroup(),
-        (f.getFullPath(parent)).makeQualified(
-            getUri(), getWorkingDirectory())); // fully-qualify path
-  }
-
-  private LocatedFileStatus makeQualifiedLocated(
-      HdfsLocatedFileStatus f, Path parent) {
-    return new LocatedFileStatus(f.getLen(), f.isDir(), f.getReplication(),
-        f.getBlockSize(), f.getModificationTime(),
-        f.getAccessTime(),
-        f.getPermission(), f.getOwner(), f.getGroup(),
-        null,
-        (f.getFullPath(parent)).makeQualified(
-            getUri(), getWorkingDirectory()), // fully-qualify path
-        DFSUtil.locatedBlocks2Locations(f.getBlockLocations()));
+  public void setQuota(Path src, final long namespaceQuota,
+      final long diskspaceQuota) throws IOException {
+    Path absF = fixRelativePart(src);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.setQuota(getPathName(p), namespaceQuota, diskspaceQuota);
+        return null;
+      }
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        // setQuota is not defined in FileSystem, so we only can resolve
+        // within this DFS
+        return doCall(p);
+      }
+    }.resolve(this, absF);
   }
 
-  /**
-   * List all the entries of a directory
-   *
-   * Note that this operation is not atomic for a large directory.
-   * The entries of a directory may be fetched from NameNode multiple times.
-   * It only guarantees that  each name occurs once if a directory
-   * undergoes changes between the calls.
-   */
-  @Override
-  public FileStatus[] listStatus(Path p) throws IOException {
+  private FileStatus[] listStatusInternal(Path p) throws IOException {
     String src = getPathName(p);
 
     // fetch the first batch of entries in the directory
@@ -437,7 +660,7 @@ public class DistributedFileSystem exten
     if (!thisListing.hasMore()) { // got all entries of the directory
       FileStatus[] stats = new FileStatus[partialListing.length];
       for (int i = 0; i < partialListing.length; i++) {
-        stats[i] = makeQualified(partialListing[i], p);
+        stats[i] = partialListing[i].makeQualified(getUri(), p);
       }
       statistics.incrementReadOps(1);
       return stats;
@@ -451,7 +674,7 @@ public class DistributedFileSystem exten
       new ArrayList<FileStatus>(totalNumEntries);
     // add the first batch of entries to the array list
     for (HdfsFileStatus fileStatus : partialListing) {
-      listing.add(makeQualified(fileStatus, p));
+      listing.add(fileStatus.makeQualified(getUri(), p));
     }
     statistics.incrementLargeReadOps(1);
  
@@ -465,7 +688,7 @@ public class DistributedFileSystem exten
  
       partialListing = thisListing.getPartialListing();
       for (HdfsFileStatus fileStatus : partialListing) {
-        listing.add(makeQualified(fileStatus, p));
+        listing.add(fileStatus.makeQualified(getUri(), p));
       }
       statistics.incrementLargeReadOps(1);
     } while (thisListing.hasMore());
@@ -473,6 +696,31 @@ public class DistributedFileSystem exten
     return listing.toArray(new FileStatus[listing.size()]);
   }
 
+  /**
+   * List all the entries of a directory
+   *
+   * Note that this operation is not atomic for a large directory.
+   * The entries of a directory may be fetched from NameNode multiple times.
+   * It only guarantees that  each name occurs once if a directory
+   * undergoes changes between the calls.
+   */
+  @Override
+  public FileStatus[] listStatus(Path p) throws IOException {
+    Path absF = fixRelativePart(p);
+    return new FileSystemLinkResolver<FileStatus[]>() {
+      @Override
+      public FileStatus[] doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return listStatusInternal(p);
+      }
+      @Override
+      public FileStatus[] next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.listStatus(p);
+      }
+    }.resolve(this, absF);
+  }
+
   @Override
   protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
       final PathFilter filter)
@@ -484,7 +732,9 @@ public class DistributedFileSystem exten
       private LocatedFileStatus curStat = null;
 
       { // initializer
-        src = getPathName(p);
+        // Fully resolve symlinks in path first to avoid additional resolution
+        // round-trips as we fetch more batches of listings
+        src = getPathName(resolvePath(p));
         // fetch the first batch of entries in the directory
         thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true);
         statistics.incrementReadOps(1);
@@ -496,8 +746,9 @@ public class DistributedFileSystem exten
       @Override
       public boolean hasNext() throws IOException {
         while (curStat == null && hasNextNoFilter()) {
-          LocatedFileStatus next = makeQualifiedLocated(
-              (HdfsLocatedFileStatus)thisListing.getPartialListing()[i++], p);
+          LocatedFileStatus next = 
+              ((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++])
+              .makeQualifiedLocated(getUri(), p);
           if (filter.accept(next.getPath())) {
             curStat = next;
           }
@@ -547,8 +798,7 @@ public class DistributedFileSystem exten
    *                    effective permission.
    */
   public boolean mkdir(Path f, FsPermission permission) throws IOException {
-    statistics.incrementWriteOps(1);
-    return dfs.mkdirs(getPathName(f), permission, false);
+    return mkdirsInternal(f, permission, false);
   }
 
   /**
@@ -564,8 +814,32 @@ public class DistributedFileSystem exten
    */
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    return mkdirsInternal(f, permission, true);
+  }
+
+  private boolean mkdirsInternal(Path f, final FsPermission permission,
+      final boolean createParent) throws IOException {
     statistics.incrementWriteOps(1);
-    return dfs.mkdirs(getPathName(f), permission, true);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<Boolean>() {
+      @Override
+      public Boolean doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.mkdirs(getPathName(p), permission, createParent);
+      }
+
+      @Override
+      public Boolean next(final FileSystem fs, final Path p)
+          throws IOException {
+        // FileSystem doesn't have a non-recursive mkdir() method
+        // Best we can do is error out
+        if (!createParent) {
+          throw new IOException("FileSystem does not support non-recursive"
+              + "mkdir");
+        }
+        return fs.mkdirs(p, permission);
+      }
+    }.resolve(this, absF);
   }
 
   @SuppressWarnings("deprecation")
@@ -835,42 +1109,207 @@ public class DistributedFileSystem exten
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
     statistics.incrementReadOps(1);
-    HdfsFileStatus fi = dfs.getFileInfo(getPathName(f));
-    if (fi != null) {
-      return makeQualified(fi, f);
-    } else {
-      throw new FileNotFoundException("File does not exist: " + f);
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FileStatus>() {
+      @Override
+      public FileStatus doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        HdfsFileStatus fi = dfs.getFileInfo(getPathName(p));
+        if (fi != null) {
+          return fi.makeQualified(getUri(), p);
+        } else {
+          throw new FileNotFoundException("File does not exist: " + p);
+        }
+      }
+      @Override
+      public FileStatus next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.getFileStatus(p);
+      }
+    }.resolve(this, absF);
+  }
+
+  @Override
+  public void createSymlink(final Path target, final Path link,
+      final boolean createParent) throws AccessControlException,
+      FileAlreadyExistsException, FileNotFoundException,
+      ParentNotDirectoryException, UnsupportedFileSystemException, 
+      IOException {
+    statistics.incrementWriteOps(1);
+    final Path absF = fixRelativePart(link);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        dfs.createSymlink(target.toString(), getPathName(p), createParent);
+        return null;
+      }
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException, UnresolvedLinkException {
+        fs.createSymlink(target, p, createParent);
+        return null;
+      }
+    }.resolve(this, absF);
+  }
+
+  @Override
+  public boolean supportsSymlinks() {
+    return true;
+  }
+
+  @Override
+  public FileStatus getFileLinkStatus(final Path f)
+      throws AccessControlException, FileNotFoundException,
+      UnsupportedFileSystemException, IOException {
+    statistics.incrementReadOps(1);
+    final Path absF = fixRelativePart(f);
+    FileStatus status = new FileSystemLinkResolver<FileStatus>() {
+      @Override
+      public FileStatus doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
+        if (fi != null) {
+          return fi.makeQualified(getUri(), p);
+        } else {
+          throw new FileNotFoundException("File does not exist: " + p);
+        }
+      }
+      @Override
+      public FileStatus next(final FileSystem fs, final Path p)
+        throws IOException, UnresolvedLinkException {
+        return fs.getFileLinkStatus(p);
+      }
+    }.resolve(this, absF);
+    // Fully-qualify the symlink
+    if (status.isSymlink()) {
+      Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
+          status.getPath(), status.getSymlink());
+      status.setSymlink(targetQual);
+    }
+    return status;
+  }
+
+  @Override
+  public Path getLinkTarget(final Path f) throws AccessControlException,
+      FileNotFoundException, UnsupportedFileSystemException, IOException {
+    statistics.incrementReadOps(1);
+    final Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<Path>() {
+      @Override
+      public Path doCall(final Path p) throws IOException,
+          UnresolvedLinkException {
+        HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
+        if (fi != null) {
+          return fi.makeQualified(getUri(), p).getSymlink();
+        } else {
+          throw new FileNotFoundException("File does not exist: " + p);
+        }
+      }
+      @Override
+      public Path next(final FileSystem fs, final Path p)
+        throws IOException, UnresolvedLinkException {
+        return fs.getLinkTarget(p);
+      }
+    }.resolve(this, absF);
+  }
+
+  @Override
+  protected Path resolveLink(Path f) throws IOException {
+    statistics.incrementReadOps(1);
+    String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
+    if (target == null) {
+      throw new FileNotFoundException("File does not exist: " + f.toString());
     }
+    return new Path(target);
   }
 
   @Override
-  public MD5MD5CRC32FileChecksum getFileChecksum(Path f) throws IOException {
+  public FileChecksum getFileChecksum(Path f) throws IOException {
     statistics.incrementReadOps(1);
-    return dfs.getFileChecksum(getPathName(f));
+    Path absF = fixRelativePart(f);
+    return new FileSystemLinkResolver<FileChecksum>() {
+      @Override
+      public FileChecksum doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.getFileChecksum(getPathName(p));
+      }
+
+      @Override
+      public FileChecksum next(final FileSystem fs, final Path p)
+          throws IOException {
+        return fs.getFileChecksum(p);
+      }
+    }.resolve(this, absF);
   }
 
   @Override
-  public void setPermission(Path p, FsPermission permission
+  public void setPermission(Path p, final FsPermission permission
       ) throws IOException {
     statistics.incrementWriteOps(1);
-    dfs.setPermission(getPathName(p), permission);
+    Path absF = fixRelativePart(p);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.setPermission(getPathName(p), permission);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        fs.setPermission(p, permission);
+        return null;
+      }
+    }.resolve(this, absF);
   }
 
   @Override
-  public void setOwner(Path p, String username, String groupname
+  public void setOwner(Path p, final String username, final String groupname
       ) throws IOException {
     if (username == null && groupname == null) {
       throw new IOException("username == null && groupname == null");
     }
     statistics.incrementWriteOps(1);
-    dfs.setOwner(getPathName(p), username, groupname);
+    Path absF = fixRelativePart(p);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.setOwner(getPathName(p), username, groupname);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        fs.setOwner(p, username, groupname);
+        return null;
+      }
+    }.resolve(this, absF);
   }
 
   @Override
-  public void setTimes(Path p, long mtime, long atime
+  public void setTimes(Path p, final long mtime, final long atime
       ) throws IOException {
     statistics.incrementWriteOps(1);
-    dfs.setTimes(getPathName(p), mtime, atime);
+    Path absF = fixRelativePart(p);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.setTimes(getPathName(p), mtime, atime);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        fs.setTimes(p, mtime, atime);
+        return null;
+      }
+    }.resolve(this, absF);
   }
   
 
@@ -946,7 +1385,7 @@ public class DistributedFileSystem exten
    * The bandwidth parameter is the max number of bytes per second of network
    * bandwidth to be used by a datanode during balancing.
    *
-   * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
+   * @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
    * @throws IOException
    */
   public void setBalancerBandwidth(long bandwidth) throws IOException {
@@ -987,25 +1426,111 @@ public class DistributedFileSystem exten
   }
 
   /** @see HdfsAdmin#allowSnapshot(Path) */
-  public void allowSnapshot(Path path) throws IOException {
-    dfs.allowSnapshot(getPathName(path));
+  public void allowSnapshot(final Path path) throws IOException {
+    Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.allowSnapshot(getPathName(p));
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          myDfs.allowSnapshot(p);
+        } else {
+          throw new UnsupportedOperationException("Cannot perform snapshot"
+              + " operations on a symlink to a non-DistributedFileSystem: "
+              + path + " -> " + p);
+        }
+        return null;
+      }
+    }.resolve(this, absF);
   }
   
   /** @see HdfsAdmin#disallowSnapshot(Path) */
-  public void disallowSnapshot(Path path) throws IOException {
-    dfs.disallowSnapshot(getPathName(path));
+  public void disallowSnapshot(final Path path) throws IOException {
+    Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.disallowSnapshot(getPathName(p));
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          myDfs.disallowSnapshot(p);
+        } else {
+          throw new UnsupportedOperationException("Cannot perform snapshot"
+              + " operations on a symlink to a non-DistributedFileSystem: "
+              + path + " -> " + p);
+        }
+        return null;
+      }
+    }.resolve(this, absF);
   }
   
   @Override
-  public Path createSnapshot(Path path, String snapshotName) 
+  public Path createSnapshot(final Path path, final String snapshotName) 
       throws IOException {
-    return new Path(dfs.createSnapshot(getPathName(path), snapshotName));
+    Path absF = fixRelativePart(path);
+    return new FileSystemLinkResolver<Path>() {
+      @Override
+      public Path doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return new Path(dfs.createSnapshot(getPathName(p), snapshotName));
+      }
+
+      @Override
+      public Path next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          return myDfs.createSnapshot(p);
+        } else {
+          throw new UnsupportedOperationException("Cannot perform snapshot"
+              + " operations on a symlink to a non-DistributedFileSystem: "
+              + path + " -> " + p);
+        }
+      }
+    }.resolve(this, absF);
   }
   
   @Override
-  public void renameSnapshot(Path path, String snapshotOldName,
-      String snapshotNewName) throws IOException {
-    dfs.renameSnapshot(getPathName(path), snapshotOldName, snapshotNewName);
+  public void renameSnapshot(final Path path, final String snapshotOldName,
+      final String snapshotNewName) throws IOException {
+    Path absF = fixRelativePart(path);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.renameSnapshot(getPathName(p), snapshotOldName, snapshotNewName);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          myDfs.renameSnapshot(p, snapshotOldName, snapshotNewName);
+        } else {
+          throw new UnsupportedOperationException("Cannot perform snapshot"
+              + " operations on a symlink to a non-DistributedFileSystem: "
+              + path + " -> " + p);
+        }
+        return null;
+      }
+    }.resolve(this, absF);
   }
   
   /**
@@ -1018,9 +1543,31 @@ public class DistributedFileSystem exten
   }
   
   @Override
-  public void deleteSnapshot(Path snapshotDir, String snapshotName)
+  public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
       throws IOException {
-    dfs.deleteSnapshot(getPathName(snapshotDir), snapshotName);
+    Path absF = fixRelativePart(snapshotDir);
+    new FileSystemLinkResolver<Void>() {
+      @Override
+      public Void doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        dfs.deleteSnapshot(getPathName(p), snapshotName);
+        return null;
+      }
+
+      @Override
+      public Void next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          myDfs.deleteSnapshot(p, snapshotName);
+        } else {
+          throw new UnsupportedOperationException("Cannot perform snapshot"
+              + " operations on a symlink to a non-DistributedFileSystem: "
+              + snapshotDir + " -> " + p);
+        }
+        return null;
+      }
+    }.resolve(this, absF);
   }
 
   /**
@@ -1029,9 +1576,31 @@ public class DistributedFileSystem exten
    * 
    * @see DFSClient#getSnapshotDiffReport(Path, String, String)
    */
-  public SnapshotDiffReport getSnapshotDiffReport(Path snapshotDir,
-      String fromSnapshot, String toSnapshot) throws IOException {
-    return dfs.getSnapshotDiffReport(getPathName(snapshotDir), fromSnapshot, toSnapshot);
+  public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
+      final String fromSnapshot, final String toSnapshot) throws IOException {
+    Path absF = fixRelativePart(snapshotDir);
+    return new FileSystemLinkResolver<SnapshotDiffReport>() {
+      @Override
+      public SnapshotDiffReport doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot,
+            toSnapshot);
+      }
+
+      @Override
+      public SnapshotDiffReport next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          myDfs.getSnapshotDiffReport(p, fromSnapshot, toSnapshot);
+        } else {
+          throw new UnsupportedOperationException("Cannot perform snapshot"
+              + " operations on a symlink to a non-DistributedFileSystem: "
+              + snapshotDir + " -> " + p);
+        }
+        return null;
+      }
+    }.resolve(this, absF);
   }
  
   /**
@@ -1042,8 +1611,28 @@ public class DistributedFileSystem exten
    * @throws FileNotFoundException if the file does not exist.
    * @throws IOException If an I/O error occurred     
    */
-  public boolean isFileClosed(Path src) throws IOException {
-    return dfs.isFileClosed(getPathName(src));
+  public boolean isFileClosed(final Path src) throws IOException {
+    Path absF = fixRelativePart(src);
+    return new FileSystemLinkResolver<Boolean>() {
+      @Override
+      public Boolean doCall(final Path p)
+          throws IOException, UnresolvedLinkException {
+        return dfs.isFileClosed(getPathName(p));
+      }
+
+      @Override
+      public Boolean next(final FileSystem fs, final Path p)
+          throws IOException {
+        if (fs instanceof DistributedFileSystem) {
+          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+          return myDfs.isFileClosed(p);
+        } else {
+          throw new UnsupportedOperationException("Cannot call isFileClosed"
+              + " on a symlink to a non-DistributedFileSystem: "
+              + src + " -> " + p);
+        }
+      }
+    }.resolve(this, absF);
   }
   
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1502387&r1=1502386&r2=1502387&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Thu Jul 11 22:36:37 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
@@ -414,7 +415,11 @@ public class DFSAdmin extends FsShell {
    */
   public void allowSnapshot(String[] argv) throws IOException {   
     DistributedFileSystem dfs = getDFS();
-    dfs.allowSnapshot(new Path(argv[1]));
+    try {
+      dfs.allowSnapshot(new Path(argv[1]));
+    } catch (SnapshotException e) {
+      throw new RemoteException(e.getClass().getName(), e.getMessage());
+    }
     System.out.println("Allowing snaphot on " + argv[1] + " succeeded");
   }
   
@@ -426,7 +431,11 @@ public class DFSAdmin extends FsShell {
    */
   public void disallowSnapshot(String[] argv) throws IOException {  
     DistributedFileSystem dfs = getDFS();
-    dfs.disallowSnapshot(new Path(argv[1]));
+    try {
+      dfs.disallowSnapshot(new Path(argv[1]));
+    } catch (SnapshotException e) {
+      throw new RemoteException(e.getClass().getName(), e.getMessage());
+    }
     System.out.println("Disallowing snaphot on " + argv[1] + " succeeded");
   }
   

Added: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfs.java?rev=1502387&view=auto
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfs.java (added)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfs.java Thu Jul 11 22:36:37 2013
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
+import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test symbolic links in Hdfs.
+ */
+abstract public class TestSymlinkHdfs extends SymlinkBaseTest {
+
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+  }
+
+  protected static MiniDFSCluster cluster;
+  protected static WebHdfsFileSystem webhdfs;
+  protected static DistributedFileSystem dfs;
+
+  @Override
+  protected String getScheme() {
+    return "hdfs";
+  }
+
+  @Override
+  protected String testBaseDir1() throws IOException {
+    return "/test1";
+  }
+  
+  @Override
+  protected String testBaseDir2() throws IOException {
+    return "/test2";
+  }
+
+  @Override
+  protected URI testURI() {
+    return cluster.getURI(0);
+  }
+
+  @Override
+  protected IOException unwrapException(IOException e) {
+    if (e instanceof RemoteException) {
+      return ((RemoteException)e).unwrapRemoteException();
+    }
+    return e;
+  }
+
+  @BeforeClass
+  public static void beforeClassSetup() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+    conf.set(FsPermission.UMASK_LABEL, "000");
+    cluster = new MiniDFSCluster.Builder(conf).build();
+    webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf);
+    dfs = cluster.getFileSystem();
+  }
+
+  @AfterClass
+  public static void afterClassTeardown() throws Exception {
+    cluster.shutdown();
+  }
+
+  @Test(timeout=10000)
+  /** Access a file using a link that spans Hdfs to LocalFs */
+  public void testLinkAcrossFileSystems() throws IOException {
+    Path localDir = new Path("file://" + wrapper.getAbsoluteTestRootDir()
+        + "/test");
+    Path localFile = new Path("file://" + wrapper.getAbsoluteTestRootDir()
+        + "/test/file");
+    Path link      = new Path(testBaseDir1(), "linkToFile");
+    FSTestWrapper localWrapper = wrapper.getLocalFSWrapper();
+    localWrapper.delete(localDir, true);
+    localWrapper.mkdir(localDir, FileContext.DEFAULT_PERM, true);
+    localWrapper.setWorkingDirectory(localDir);
+    assertEquals(localDir, localWrapper.getWorkingDirectory());
+    createAndWriteFile(localWrapper, localFile);
+    wrapper.createSymlink(localFile, link, false);
+    readFile(link);
+    assertEquals(fileSize, wrapper.getFileStatus(link).getLen());
+  }
+
+  @Test(timeout=10000)
+  /** Test renaming a file across two file systems using a link */
+  public void testRenameAcrossFileSystemsViaLink() throws IOException {
+    Path localDir = new Path("file://" + wrapper.getAbsoluteTestRootDir()
+        + "/test");
+    Path hdfsFile    = new Path(testBaseDir1(), "file");
+    Path link        = new Path(testBaseDir1(), "link");
+    Path hdfsFileNew = new Path(testBaseDir1(), "fileNew");
+    Path hdfsFileNewViaLink = new Path(link, "fileNew");
+    FSTestWrapper localWrapper = wrapper.getLocalFSWrapper();
+    localWrapper.delete(localDir, true);
+    localWrapper.mkdir(localDir, FileContext.DEFAULT_PERM, true);
+    localWrapper.setWorkingDirectory(localDir);
+    createAndWriteFile(hdfsFile);
+    wrapper.createSymlink(localDir, link, false);
+    // Rename hdfs://test1/file to hdfs://test1/link/fileNew
+    // which renames to file://TEST_ROOT/test/fileNew which
+    // spans AbstractFileSystems and therefore fails.
+    try {
+      wrapper.rename(hdfsFile, hdfsFileNewViaLink);
+      fail("Renamed across file systems");
+    } catch (InvalidPathException ipe) {
+      // Expected from FileContext
+    } catch (IllegalArgumentException e) {
+      // Expected from Filesystem
+      GenericTestUtils.assertExceptionContains("Wrong FS: ", e);
+    }
+    // Now rename hdfs://test1/link/fileNew to hdfs://test1/fileNew
+    // which renames file://TEST_ROOT/test/fileNew to hdfs://test1/fileNew
+    // which spans AbstractFileSystems and therefore fails.
+    createAndWriteFile(hdfsFileNewViaLink);
+    try {
+      wrapper.rename(hdfsFileNewViaLink, hdfsFileNew);
+      fail("Renamed across file systems");
+    } catch (InvalidPathException ipe) {
+      // Expected from FileContext
+    } catch (IllegalArgumentException e) {
+      // Expected from Filesystem
+      GenericTestUtils.assertExceptionContains("Wrong FS: ", e);
+    }
+  }
+
+  @Test(timeout=10000)
+  /** Test create symlink to / */
+  public void testCreateLinkToSlash() throws IOException {
+    Path dir  = new Path(testBaseDir1());
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToSlash");
+    Path fileViaLink = new Path(testBaseDir1()+"/linkToSlash"+
+                                testBaseDir1()+"/file");
+    createAndWriteFile(file);
+    wrapper.setWorkingDirectory(dir);
+    wrapper.createSymlink(new Path("/"), link, false);
+    readFile(fileViaLink);
+    assertEquals(fileSize, wrapper.getFileStatus(fileViaLink).getLen());
+    // Ditto when using another file context since the file system
+    // for the slash is resolved according to the link's parent.
+    if (wrapper instanceof FileContextTestWrapper) {
+      FSTestWrapper localWrapper = wrapper.getLocalFSWrapper();
+      Path linkQual = new Path(cluster.getURI(0).toString(), fileViaLink);
+      assertEquals(fileSize, localWrapper.getFileStatus(linkQual).getLen());
+    }
+  }
+  
+  
+  @Test(timeout=10000)
+  /** setPermission affects the target not the link */
+  public void testSetPermissionAffectsTarget() throws IOException {
+    Path file       = new Path(testBaseDir1(), "file");
+    Path dir        = new Path(testBaseDir2());
+    Path linkToFile = new Path(testBaseDir1(), "linkToFile");
+    Path linkToDir  = new Path(testBaseDir1(), "linkToDir");
+    createAndWriteFile(file);
+    wrapper.createSymlink(file, linkToFile, false);
+    wrapper.createSymlink(dir, linkToDir, false);
+    
+    // Changing the permissions using the link does not modify
+    // the permissions of the link..
+    FsPermission perms = wrapper.getFileLinkStatus(linkToFile).getPermission();
+    wrapper.setPermission(linkToFile, new FsPermission((short)0664));
+    wrapper.setOwner(linkToFile, "user", "group");
+    assertEquals(perms, wrapper.getFileLinkStatus(linkToFile).getPermission());
+    // but the file's permissions were adjusted appropriately
+    FileStatus stat = wrapper.getFileStatus(file);
+    assertEquals(0664, stat.getPermission().toShort());
+    assertEquals("user", stat.getOwner());
+    assertEquals("group", stat.getGroup());
+    // Getting the file's permissions via the link is the same
+    // as getting the permissions directly.
+    assertEquals(stat.getPermission(), 
+                 wrapper.getFileStatus(linkToFile).getPermission());
+
+    // Ditto for a link to a directory
+    perms = wrapper.getFileLinkStatus(linkToDir).getPermission();
+    wrapper.setPermission(linkToDir, new FsPermission((short)0664));
+    wrapper.setOwner(linkToDir, "user", "group");
+    assertEquals(perms, wrapper.getFileLinkStatus(linkToDir).getPermission());
+    stat = wrapper.getFileStatus(dir);
+    assertEquals(0664, stat.getPermission().toShort());
+    assertEquals("user", stat.getOwner());
+    assertEquals("group", stat.getGroup());
+    assertEquals(stat.getPermission(), 
+                 wrapper.getFileStatus(linkToDir).getPermission());
+  }  
+
+  @Test(timeout=10000)
+  /** Create a symlink using a path with scheme but no authority */
+  public void testCreateWithPartQualPathFails() throws IOException {
+    Path fileWoAuth = new Path("hdfs:///test/file");
+    Path linkWoAuth = new Path("hdfs:///test/link");
+    try {
+      createAndWriteFile(fileWoAuth);
+      fail("HDFS requires URIs with schemes have an authority");
+    } catch (RuntimeException e) {
+      // Expected
+    }
+    try {
+      wrapper.createSymlink(new Path("foo"), linkWoAuth, false);
+      fail("HDFS requires URIs with schemes have an authority");
+    } catch (RuntimeException e) {
+      // Expected
+    }
+  }
+
+  @Test(timeout=10000)
+  /** setReplication affects the target not the link */
+  public void testSetReplication() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    wrapper.createSymlink(file, link, false);
+    wrapper.setReplication(link, (short)2);
+    assertEquals(0, wrapper.getFileLinkStatus(link).getReplication());
+    assertEquals(2, wrapper.getFileStatus(link).getReplication());
+    assertEquals(2, wrapper.getFileStatus(file).getReplication());
+  }
+  
+  @Test(timeout=10000)
+  /** Test create symlink with a max len name */
+  public void testCreateLinkMaxPathLink() throws IOException {
+    Path dir  = new Path(testBaseDir1());
+    Path file = new Path(testBaseDir1(), "file");
+    final int maxPathLen = HdfsConstants.MAX_PATH_LENGTH;
+    final int dirLen     = dir.toString().length() + 1;
+    int   len            = maxPathLen - dirLen;
+    
+    // Build a MAX_PATH_LENGTH path
+    StringBuilder sb = new StringBuilder("");
+    for (int i = 0; i < (len / 10); i++) {
+      sb.append("0123456789");
+    }
+    for (int i = 0; i < (len % 10); i++) {
+      sb.append("x");
+    }
+    Path link = new Path(sb.toString());
+    assertEquals(maxPathLen, dirLen + link.toString().length()); 
+    
+    // Check that it works
+    createAndWriteFile(file);
+    wrapper.setWorkingDirectory(dir);
+    wrapper.createSymlink(file, link, false);
+    readFile(link);
+    
+    // Now modify the path so it's too large
+    link = new Path(sb.toString()+"x");
+    try {
+      wrapper.createSymlink(file, link, false);
+      fail("Path name should be too long");
+    } catch (IOException x) {
+      // Expected
+    }
+  }
+
+  @Test(timeout=10000)
+  /** Test symlink owner */
+  public void testLinkOwner() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "symlinkToFile");
+    createAndWriteFile(file);
+    wrapper.createSymlink(file, link, false);
+    FileStatus statFile = wrapper.getFileStatus(file);
+    FileStatus statLink = wrapper.getFileStatus(link);
+    assertEquals(statLink.getOwner(), statFile.getOwner());
+  }
+
+  @Test(timeout=10000)
+  /** Test WebHdfsFileSystem.createSymlink(..). */
+  public void testWebHDFS() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    webhdfs.createSymlink(file, link, false);
+    wrapper.setReplication(link, (short)2);
+    assertEquals(0, wrapper.getFileLinkStatus(link).getReplication());
+    assertEquals(2, wrapper.getFileStatus(link).getReplication());
+    assertEquals(2, wrapper.getFileStatus(file).getReplication());
+  }
+
+  @Test(timeout=10000)
+  /** Test craeteSymlink(..) with quota. */
+  public void testQuota() throws IOException {
+    final Path dir = new Path(testBaseDir1());
+    dfs.setQuota(dir, 3, HdfsConstants.QUOTA_DONT_SET);
+
+    final Path file = new Path(dir, "file");
+    createAndWriteFile(file);
+
+    //creating the first link should succeed
+    final Path link1 = new Path(dir, "link1");
+    wrapper.createSymlink(file, link1, false);
+
+    try {
+      //creating the second link should fail with QuotaExceededException.
+      final Path link2 = new Path(dir, "link2");
+      wrapper.createSymlink(file, link2, false);
+      fail("Created symlink despite quota violation");
+    } catch(QuotaExceededException qee) {
+      //expected
+    }
+  }
+}

Added: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileContext.java?rev=1502387&view=auto
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileContext.java (added)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileContext.java Thu Jul 11 22:36:37 2013
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSymlinkHdfsFileContext extends TestSymlinkHdfs {
+
+  private static FileContext fc;
+
+  @BeforeClass
+  public static void testSetup() throws Exception {
+    fc = FileContext.getFileContext(cluster.getURI(0));
+    wrapper = new FileContextTestWrapper(fc, "/tmp/TestSymlinkHdfsFileContext");
+  }
+
+  @Test(timeout=1000)
+  /** Test access a symlink using AbstractFileSystem */
+  public void testAccessLinkFromAbstractFileSystem() throws IOException {
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "linkToFile");
+    createAndWriteFile(file);
+    wrapper.createSymlink(file, link, false);
+    try {
+      AbstractFileSystem afs = fc.getDefaultFileSystem();
+      afs.open(link);
+      fail("Opened a link using AFS");
+    } catch (UnresolvedLinkException x) {
+      // Expected
+    }
+  }
+}

Added: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileSystem.java?rev=1502387&view=auto
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileSystem.java (added)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestSymlinkHdfsFileSystem.java Thu Jul 11 22:36:37 2013
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestSymlinkHdfsFileSystem extends TestSymlinkHdfs {
+
+  @BeforeClass
+  public static void testSetup() throws Exception {
+    wrapper = new FileSystemTestWrapper(dfs, "/tmp/TestSymlinkHdfsFileSystem");
+  }
+
+  @Override
+  @Ignore("FileSystem adds missing authority in absolute URIs")
+  @Test(timeout=1000)
+  public void testCreateWithPartQualPathFails() throws IOException {}
+
+  @Ignore("FileSystem#create creates parent directories," +
+      " so dangling links to directories are created")
+  @Override
+  @Test(timeout=1000)
+  public void testCreateFileViaDanglingLinkParent() throws IOException {}
+
+  // Additional tests for DFS-only methods
+
+  @Test(timeout=10000)
+  public void testRecoverLease() throws IOException {
+    Path dir  = new Path(testBaseDir1());
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "link");
+    wrapper.setWorkingDirectory(dir);
+    createAndWriteFile(file);
+    wrapper.createSymlink(file, link, false);
+    // Attempt recoverLease through a symlink
+    boolean closed = dfs.recoverLease(link);
+    assertTrue("Expected recoverLease to return true", closed);
+  }
+
+  @Test(timeout=10000)
+  public void testIsFileClosed() throws IOException {
+    Path dir  = new Path(testBaseDir1());
+    Path file = new Path(testBaseDir1(), "file");
+    Path link = new Path(testBaseDir1(), "link");
+    wrapper.setWorkingDirectory(dir);
+    createAndWriteFile(file);
+    wrapper.createSymlink(file, link, false);
+    // Attempt recoverLease through a symlink
+    boolean closed = dfs.isFileClosed(link);
+    assertTrue("Expected isFileClosed to return true", closed);
+  }
+
+  @Test(timeout=10000)
+  public void testConcat() throws Exception {
+    Path dir  = new Path(testBaseDir1());
+    Path link = new Path(testBaseDir1(), "link");
+    Path dir2 = new Path(testBaseDir2());
+    wrapper.createSymlink(dir2, link, false);
+    wrapper.setWorkingDirectory(dir);
+    // Concat with a target and srcs through a link
+    Path target = new Path(link, "target");
+    createAndWriteFile(target);
+    Path[] srcs = new Path[3];
+    for (int i=0; i<srcs.length; i++) {
+      srcs[i] = new Path(link, "src-" + i);
+      createAndWriteFile(srcs[i]);
+    }
+    dfs.concat(target, srcs);
+  }
+
+  @Test(timeout=10000)
+  public void testSnapshot() throws Exception {
+    Path dir  = new Path(testBaseDir1());
+    Path link = new Path(testBaseDir1(), "link");
+    Path dir2 = new Path(testBaseDir2());
+    wrapper.createSymlink(dir2, link, false);
+    wrapper.setWorkingDirectory(dir);
+    dfs.allowSnapshot(link);
+    dfs.disallowSnapshot(link);
+    dfs.allowSnapshot(link);
+    dfs.createSnapshot(link, "mcmillan");
+    dfs.renameSnapshot(link, "mcmillan", "seaborg");
+    dfs.deleteSnapshot(link, "seaborg");
+  }
+}

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java?rev=1502387&r1=1502386&r2=1502387&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestNestedSnapshots.java Thu Jul 11 22:36:37 2013
@@ -135,7 +135,7 @@ public class TestNestedSnapshots {
     try {
       hdfs.disallowSnapshot(rootPath);
       fail("Expect snapshot exception when disallowing snapshot on root again");
-    } catch (RemoteException e) {
+    } catch (SnapshotException e) {
       GenericTestUtils.assertExceptionContains(
           "Root is not a snapshottable directory", e);
     }
@@ -149,16 +149,16 @@ public class TestNestedSnapshots {
     try {
       hdfs.allowSnapshot(rootPath);
       Assert.fail();
-    } catch(RemoteException se) {
+    } catch (SnapshotException se) {
       assertNestedSnapshotException(
-          (SnapshotException) se.unwrapRemoteException(), "subdirectory");
+          se, "subdirectory");
     }
     try {
       hdfs.allowSnapshot(foo);
       Assert.fail();
-    } catch(RemoteException se) {
+    } catch (SnapshotException se) {
       assertNestedSnapshotException(
-          (SnapshotException) se.unwrapRemoteException(), "subdirectory");
+          se, "subdirectory");
     }
 
     final Path sub1Bar = new Path(bar, "sub1");
@@ -167,16 +167,16 @@ public class TestNestedSnapshots {
     try {
       hdfs.allowSnapshot(sub1Bar);
       Assert.fail();
-    } catch(RemoteException se) {
+    } catch (SnapshotException se) {
       assertNestedSnapshotException(
-          (SnapshotException) se.unwrapRemoteException(), "ancestor");
+          se, "ancestor");
     }
     try {
       hdfs.allowSnapshot(sub2Bar);
       Assert.fail();
-    } catch(RemoteException se) {
+    } catch (SnapshotException se) {
       assertNestedSnapshotException(
-          (SnapshotException) se.unwrapRemoteException(), "ancestor");
+          se, "ancestor");
     }
   }