You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/29 22:30:29 UTC

[27/50] [abbrv] hadoop git commit: HDFS-8740. Move DistributedFileSystem to hadoop-hdfs-client. Contributed by Mingliang Liu.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
deleted file mode 100644
index 1d20f82..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ /dev/null
@@ -1,2262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.BlockStoragePolicySpi;
-import org.apache.hadoop.fs.CacheFlag;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSLinkResolver;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemLinkResolver;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.FsStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.XAttrSetFlag;
-import org.apache.hadoop.fs.Options.ChecksumOpt;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.EncryptionZone;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
-import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/****************************************************************
- * Implementation of the abstract FileSystem for the DFS system.
- * This object is the way end-user code interacts with a Hadoop
- * DistributedFileSystem.
- *
- *****************************************************************/
-@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
-@InterfaceStability.Unstable
-public class DistributedFileSystem extends FileSystem {
-  private Path workingDir;
-  private URI uri;
-  private String homeDirPrefix =
-      HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;
-
-  DFSClient dfs;
-  private boolean verifyChecksum = true;
-  
-  static{
-    HdfsConfiguration.init();
-  }
-
-  public DistributedFileSystem() {
-  }
-
-  /**
-   * Return the protocol scheme for the FileSystem.
-   * <p/>
-   *
-   * @return <code>hdfs</code>
-   */
-  @Override
-  public String getScheme() {
-    return HdfsConstants.HDFS_URI_SCHEME;
-  }
-
-  @Override
-  public URI getUri() { return uri; }
-
-  @Override
-  public void initialize(URI uri, Configuration conf) throws IOException {
-    super.initialize(uri, conf);
-    setConf(conf);
-
-    String host = uri.getHost();
-    if (host == null) {
-      throw new IOException("Incomplete HDFS URI, no host: "+ uri);
-    }
-    homeDirPrefix = conf.get(
-        HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
-        HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
-
-    this.dfs = new DFSClient(uri, conf, statistics);
-    this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
-    this.workingDir = getHomeDirectory();
-  }
-
-  @Override
-  public Path getWorkingDirectory() {
-    return workingDir;
-  }
-
-  @Override
-  public long getDefaultBlockSize() {
-    return dfs.getConf().getDefaultBlockSize();
-  }
-
-  @Override
-  public short getDefaultReplication() {
-    return dfs.getConf().getDefaultReplication();
-  }
-
-  @Override
-  public void setWorkingDirectory(Path dir) {
-    String result = fixRelativePart(dir).toUri().getPath();
-    if (!DFSUtil.isValidName(result)) {
-      throw new IllegalArgumentException("Invalid DFS directory name " + 
-                                         result);
-    }
-    workingDir = fixRelativePart(dir);
-  }
-
-  @Override
-  public Path getHomeDirectory() {
-    return makeQualified(new Path(homeDirPrefix + "/"
-        + dfs.ugi.getShortUserName()));
-  }
-
-  /**
-   * Checks that the passed URI belongs to this filesystem and returns
-   * just the path component. Expects a URI with an absolute path.
-   * 
-   * @param file URI with absolute path
-   * @return path component of {file}
-   * @throws IllegalArgumentException if URI does not belong to this DFS
-   */
-  private String getPathName(Path file) {
-    checkPath(file);
-    String result = file.toUri().getPath();
-    if (!DFSUtil.isValidName(result)) {
-      throw new IllegalArgumentException("Pathname " + result + " from " +
-                                         file+" is not a valid DFS filename.");
-    }
-    return result;
-  }
-  
-  @Override
-  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
-      long len) throws IOException {
-    if (file == null) {
-      return null;
-    }
-    return getFileBlockLocations(file.getPath(), start, len);
-  }
-  
-  @Override
-  public BlockLocation[] getFileBlockLocations(Path p, 
-      final long start, final long len) throws IOException {
-    statistics.incrementReadOps(1);
-    final Path absF = fixRelativePart(p);
-    return new FileSystemLinkResolver<BlockLocation[]>() {
-      @Override
-      public BlockLocation[] doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.getBlockLocations(getPathName(p), start, len);
-      }
-      @Override
-      public BlockLocation[] next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.getFileBlockLocations(p, start, len);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public void setVerifyChecksum(boolean verifyChecksum) {
-    this.verifyChecksum = verifyChecksum;
-  }
-
-  /** 
-   * Start the lease recovery of a file
-   *
-   * @param f a file
-   * @return true if the file is already closed
-   * @throws IOException if an error occurs
-   */
-  public boolean recoverLease(final Path f) throws IOException {
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<Boolean>() {
-      @Override
-      public Boolean doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.recoverLease(getPathName(p));
-      }
-      @Override
-      public Boolean next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          return myDfs.recoverLease(p);
-        }
-        throw new UnsupportedOperationException("Cannot recoverLease through" +
-            " a symlink to a non-DistributedFileSystem: " + f + " -> " + p);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public FSDataInputStream open(Path f, final int bufferSize)
-      throws IOException {
-    statistics.incrementReadOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<FSDataInputStream>() {
-      @Override
-      public FSDataInputStream doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        final DFSInputStream dfsis =
-          dfs.open(getPathName(p), bufferSize, verifyChecksum);
-        return dfs.createWrappedInputStream(dfsis);
-      }
-      @Override
-      public FSDataInputStream next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.open(p, bufferSize);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public FSDataOutputStream append(Path f, final int bufferSize,
-      final Progressable progress) throws IOException {
-    return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
-  }
-
-  /**
-   * Append to an existing file (optional operation).
-   * 
-   * @param f the existing file to be appended.
-   * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
-   *          to be present.
-   * @param bufferSize the size of the buffer to be used.
-   * @param progress for reporting progress if it is not null.
-   * @return Returns instance of {@link FSDataOutputStream}
-   * @throws IOException
-   */
-  public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
-      final int bufferSize, final Progressable progress) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<FSDataOutputStream>() {
-      @Override
-      public FSDataOutputStream doCall(final Path p)
-          throws IOException {
-        return dfs.append(getPathName(p), bufferSize, flag, progress,
-            statistics);
-      }
-      @Override
-      public FSDataOutputStream next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.append(p, bufferSize);
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * Append to an existing file (optional operation).
-   * 
-   * @param f the existing file to be appended.
-   * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
-   *          to be present.
-   * @param bufferSize the size of the buffer to be used.
-   * @param progress for reporting progress if it is not null.
-   * @param favoredNodes Favored nodes for new blocks
-   * @return Returns instance of {@link FSDataOutputStream}
-   * @throws IOException
-   */
-  public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
-      final int bufferSize, final Progressable progress,
-      final InetSocketAddress[] favoredNodes) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<FSDataOutputStream>() {
-      @Override
-      public FSDataOutputStream doCall(final Path p)
-          throws IOException {
-        return dfs.append(getPathName(p), bufferSize, flag, progress,
-            statistics, favoredNodes);
-      }
-      @Override
-      public FSDataOutputStream next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.append(p, bufferSize);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public FSDataOutputStream create(Path f, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
-      Progressable progress) throws IOException {
-    return this.create(f, permission,
-        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-            : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
-        blockSize, progress, null);
-  }
-
-  /**
-   * Same as  
-   * {@link #create(Path, FsPermission, boolean, int, short, long, 
-   * Progressable)} with the addition of favoredNodes that is a hint to 
-   * where the namenode should place the file blocks.
-   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
-   * at the creation time only. And with favored nodes, blocks will be pinned
-   * on the datanodes to prevent balancing move the block. HDFS could move the
-   * blocks during replication, to move the blocks from favored nodes. A value
-   * of null means no favored nodes for this create
-   */
-  public HdfsDataOutputStream create(final Path f,
-      final FsPermission permission, final boolean overwrite,
-      final int bufferSize, final short replication, final long blockSize,
-      final Progressable progress, final InetSocketAddress[] favoredNodes)
-          throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<HdfsDataOutputStream>() {
-      @Override
-      public HdfsDataOutputStream doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        final DFSOutputStream out = dfs.create(getPathName(f), permission,
-            overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
-                : EnumSet.of(CreateFlag.CREATE),
-            true, replication, blockSize, progress, bufferSize, null,
-            favoredNodes);
-        return dfs.createWrappedOutputStream(out, statistics);
-      }
-      @Override
-      public HdfsDataOutputStream next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          return myDfs.create(p, permission, overwrite, bufferSize, replication,
-              blockSize, progress, favoredNodes);
-        }
-        throw new UnsupportedOperationException("Cannot create with" +
-            " favoredNodes through a symlink to a non-DistributedFileSystem: "
-            + f + " -> " + p);
-      }
-    }.resolve(this, absF);
-  }
-  
-  @Override
-  public FSDataOutputStream create(final Path f, final FsPermission permission,
-    final EnumSet<CreateFlag> cflags, final int bufferSize,
-    final short replication, final long blockSize, final Progressable progress,
-    final ChecksumOpt checksumOpt) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<FSDataOutputStream>() {
-      @Override
-      public FSDataOutputStream doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
-                cflags, replication, blockSize, progress, bufferSize,
-                checksumOpt);
-        return dfs.createWrappedOutputStream(dfsos, statistics);
-      }
-      @Override
-      public FSDataOutputStream next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.create(p, permission, cflags, bufferSize,
-            replication, blockSize, progress, checksumOpt);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  protected HdfsDataOutputStream primitiveCreate(Path f,
-    FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
-    short replication, long blockSize, Progressable progress,
-    ChecksumOpt checksumOpt) throws IOException {
-    statistics.incrementWriteOps(1);
-    final DFSOutputStream dfsos = dfs.primitiveCreate(
-      getPathName(fixRelativePart(f)),
-      absolutePermission, flag, true, replication, blockSize,
-      progress, bufferSize, checksumOpt);
-    return dfs.createWrappedOutputStream(dfsos, statistics);
-  }
-
-  /**
-   * Same as create(), except fails if parent directory doesn't already exist.
-   */
-  @Override
-  @SuppressWarnings("deprecation")
-  public FSDataOutputStream createNonRecursive(final Path f,
-      final FsPermission permission, final EnumSet<CreateFlag> flag,
-      final int bufferSize, final short replication, final long blockSize,
-      final Progressable progress) throws IOException {
-    statistics.incrementWriteOps(1);
-    if (flag.contains(CreateFlag.OVERWRITE)) {
-      flag.add(CreateFlag.CREATE);
-    }
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<FSDataOutputStream>() {
-      @Override
-      public FSDataOutputStream doCall(final Path p) throws IOException,
-          UnresolvedLinkException {
-        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
-          flag, false, replication, blockSize, progress, bufferSize, null);
-        return dfs.createWrappedOutputStream(dfsos, statistics);
-      }
-
-      @Override
-      public FSDataOutputStream next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.createNonRecursive(p, permission, flag, bufferSize,
-            replication, blockSize, progress);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public boolean setReplication(Path src, 
-                                final short replication
-                               ) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(src);
-    return new FileSystemLinkResolver<Boolean>() {
-      @Override
-      public Boolean doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.setReplication(getPathName(p), replication);
-      }
-      @Override
-      public Boolean next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.setReplication(p, replication);
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * Set the source path to the specified storage policy.
-   *
-   * @param src The source path referring to either a directory or a file.
-   * @param policyName The name of the storage policy.
-   */
-  @Override
-  public void setStoragePolicy(final Path src, final String policyName)
-      throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(src);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.setStoragePolicy(getPathName(p), policyName);
-        return null;
-      }
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        fs.setStoragePolicy(p, policyName);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
-    statistics.incrementReadOps(1);
-    Path absF = fixRelativePart(path);
-
-    return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
-      @Override
-      public BlockStoragePolicySpi doCall(final Path p) throws IOException {
-        return getClient().getStoragePolicy(getPathName(p));
-      }
-
-      @Override
-      public BlockStoragePolicySpi next(final FileSystem fs, final Path p)
-          throws IOException, UnresolvedLinkException {
-        return fs.getStoragePolicy(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public Collection<BlockStoragePolicy> getAllStoragePolicies()
-      throws IOException {
-    return Arrays.asList(dfs.getStoragePolicies());
-  }
-
-  /**
-   * Deprecated. Prefer {@link FileSystem#getAllStoragePolicies()}
-   * @return
-   * @throws IOException
-   */
-  @Deprecated
-  public BlockStoragePolicy[] getStoragePolicies() throws IOException {
-    statistics.incrementReadOps(1);
-    return dfs.getStoragePolicies();
-  }
-
-  /**
-   * Move blocks from srcs to trg and delete srcs afterwards.
-   * The file block sizes must be the same.
-   * 
-   * @param trg existing file to append to
-   * @param psrcs list of files (same block size, same replication)
-   * @throws IOException
-   */
-  @Override
-  public void concat(Path trg, Path [] psrcs) throws IOException {
-    statistics.incrementWriteOps(1);
-    // Make target absolute
-    Path absF = fixRelativePart(trg);
-    // Make all srcs absolute
-    Path[] srcs = new Path[psrcs.length];
-    for (int i=0; i<psrcs.length; i++) {
-      srcs[i] = fixRelativePart(psrcs[i]);
-    }
-    // Try the concat without resolving any links
-    String[] srcsStr = new String[psrcs.length];
-    try {
-      for (int i=0; i<psrcs.length; i++) {
-        srcsStr[i] = getPathName(srcs[i]);
-      }
-      dfs.concat(getPathName(trg), srcsStr);
-    } catch (UnresolvedLinkException e) {
-      // Exception could be from trg or any src.
-      // Fully resolve trg and srcs. Fail if any of them are a symlink.
-      FileStatus stat = getFileLinkStatus(absF);
-      if (stat.isSymlink()) {
-        throw new IOException("Cannot concat with a symlink target: "
-            + trg + " -> " + stat.getPath());
-      }
-      absF = fixRelativePart(stat.getPath());
-      for (int i=0; i<psrcs.length; i++) {
-        stat = getFileLinkStatus(srcs[i]);
-        if (stat.isSymlink()) {
-          throw new IOException("Cannot concat with a symlink src: "
-              + psrcs[i] + " -> " + stat.getPath());
-        }
-        srcs[i] = fixRelativePart(stat.getPath());
-      }
-      // Try concat again. Can still race with another symlink.
-      for (int i=0; i<psrcs.length; i++) {
-        srcsStr[i] = getPathName(srcs[i]);
-      }
-      dfs.concat(getPathName(absF), srcsStr);
-    }
-  }
-
-  
-  @SuppressWarnings("deprecation")
-  @Override
-  public boolean rename(Path src, Path dst) throws IOException {
-    statistics.incrementWriteOps(1);
-
-    final Path absSrc = fixRelativePart(src);
-    final Path absDst = fixRelativePart(dst);
-
-    // Try the rename without resolving first
-    try {
-      return dfs.rename(getPathName(absSrc), getPathName(absDst));
-    } catch (UnresolvedLinkException e) {
-      // Fully resolve the source
-      final Path source = getFileLinkStatus(absSrc).getPath();
-      // Keep trying to resolve the destination
-      return new FileSystemLinkResolver<Boolean>() {
-        @Override
-        public Boolean doCall(final Path p)
-            throws IOException, UnresolvedLinkException {
-          return dfs.rename(getPathName(source), getPathName(p));
-        }
-        @Override
-        public Boolean next(final FileSystem fs, final Path p)
-            throws IOException {
-          // Should just throw an error in FileSystem#checkPath
-          return doCall(p);
-        }
-      }.resolve(this, absDst);
-    }
-  }
-
-  /** 
-   * This rename operation is guaranteed to be atomic.
-   */
-  @SuppressWarnings("deprecation")
-  @Override
-  public void rename(Path src, Path dst, final Options.Rename... options)
-      throws IOException {
-    statistics.incrementWriteOps(1);
-    final Path absSrc = fixRelativePart(src);
-    final Path absDst = fixRelativePart(dst);
-    // Try the rename without resolving first
-    try {
-      dfs.rename(getPathName(absSrc), getPathName(absDst), options);
-    } catch (UnresolvedLinkException e) {
-      // Fully resolve the source
-      final Path source = getFileLinkStatus(absSrc).getPath();
-      // Keep trying to resolve the destination
-      new FileSystemLinkResolver<Void>() {
-        @Override
-        public Void doCall(final Path p)
-            throws IOException, UnresolvedLinkException {
-          dfs.rename(getPathName(source), getPathName(p), options);
-          return null;
-        }
-        @Override
-        public Void next(final FileSystem fs, final Path p)
-            throws IOException {
-          // Should just throw an error in FileSystem#checkPath
-          return doCall(p);
-        }
-      }.resolve(this, absDst);
-    }
-  }
-
-  @Override
-  public boolean truncate(Path f, final long newLength) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<Boolean>() {
-      @Override
-      public Boolean doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.truncate(getPathName(p), newLength);
-      }
-      @Override
-      public Boolean next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.truncate(p, newLength);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public boolean delete(Path f, final boolean recursive) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<Boolean>() {
-      @Override
-      public Boolean doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.delete(getPathName(p), recursive);
-      }
-      @Override
-      public Boolean next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.delete(p, recursive);
-      }
-    }.resolve(this, absF);
-  }
-  
-  @Override
-  public ContentSummary getContentSummary(Path f) throws IOException {
-    statistics.incrementReadOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<ContentSummary>() {
-      @Override
-      public ContentSummary doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.getContentSummary(getPathName(p));
-      }
-      @Override
-      public ContentSummary next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.getContentSummary(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  /** Set a directory's quotas
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long, StorageType)
-   */
-  public void setQuota(Path src, final long namespaceQuota,
-      final long storagespaceQuota) throws IOException {
-    Path absF = fixRelativePart(src);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.setQuota(getPathName(p), namespaceQuota, storagespaceQuota);
-        return null;
-      }
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        // setQuota is not defined in FileSystem, so we only can resolve
-        // within this DFS
-        return doCall(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * Set the per type storage quota of a directory.
-   *
-   * @param src target directory whose quota is to be modified.
-   * @param type storage type of the specific storage type quota to be modified.
-   * @param quota value of the specific storage type quota to be modified.
-   * Maybe {@link HdfsConstants#QUOTA_RESET} to clear quota by storage type.
-   */
-  public void setQuotaByStorageType(
-    Path src, final StorageType type, final long quota)
-    throws IOException {
-    Path absF = fixRelativePart(src);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-        throws IOException, UnresolvedLinkException {
-        dfs.setQuotaByStorageType(getPathName(p), type, quota);
-        return null;
-      }
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-        throws IOException {
-        // setQuotaByStorageType is not defined in FileSystem, so we only can resolve
-        // within this DFS
-        return doCall(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  private FileStatus[] listStatusInternal(Path p) throws IOException {
-    String src = getPathName(p);
-
-    // fetch the first batch of entries in the directory
-    DirectoryListing thisListing = dfs.listPaths(
-        src, HdfsFileStatus.EMPTY_NAME);
-
-    if (thisListing == null) { // the directory does not exist
-      throw new FileNotFoundException("File " + p + " does not exist.");
-    }
-    
-    HdfsFileStatus[] partialListing = thisListing.getPartialListing();
-    if (!thisListing.hasMore()) { // got all entries of the directory
-      FileStatus[] stats = new FileStatus[partialListing.length];
-      for (int i = 0; i < partialListing.length; i++) {
-        stats[i] = partialListing[i].makeQualified(getUri(), p);
-      }
-      statistics.incrementReadOps(1);
-      return stats;
-    }
-
-    // The directory size is too big that it needs to fetch more
-    // estimate the total number of entries in the directory
-    int totalNumEntries =
-      partialListing.length + thisListing.getRemainingEntries();
-    ArrayList<FileStatus> listing =
-      new ArrayList<FileStatus>(totalNumEntries);
-    // add the first batch of entries to the array list
-    for (HdfsFileStatus fileStatus : partialListing) {
-      listing.add(fileStatus.makeQualified(getUri(), p));
-    }
-    statistics.incrementLargeReadOps(1);
- 
-    // now fetch more entries
-    do {
-      thisListing = dfs.listPaths(src, thisListing.getLastName());
- 
-      if (thisListing == null) { // the directory is deleted
-        throw new FileNotFoundException("File " + p + " does not exist.");
-      }
- 
-      partialListing = thisListing.getPartialListing();
-      for (HdfsFileStatus fileStatus : partialListing) {
-        listing.add(fileStatus.makeQualified(getUri(), p));
-      }
-      statistics.incrementLargeReadOps(1);
-    } while (thisListing.hasMore());
- 
-    return listing.toArray(new FileStatus[listing.size()]);
-  }
-
-  /**
-   * List all the entries of a directory
-   *
-   * Note that this operation is not atomic for a large directory.
-   * The entries of a directory may be fetched from NameNode multiple times.
-   * It only guarantees that  each name occurs once if a directory
-   * undergoes changes between the calls.
-   */
-  @Override
-  public FileStatus[] listStatus(Path p) throws IOException {
-    Path absF = fixRelativePart(p);
-    return new FileSystemLinkResolver<FileStatus[]>() {
-      @Override
-      public FileStatus[] doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return listStatusInternal(p);
-      }
-      @Override
-      public FileStatus[] next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.listStatus(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
-      final PathFilter filter)
-  throws IOException {
-    Path absF = fixRelativePart(p);
-    return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
-      @Override
-      public RemoteIterator<LocatedFileStatus> doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return new DirListingIterator<LocatedFileStatus>(p, filter, true);
-      }
-
-      @Override
-      public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
-        }
-        // symlink resolution for this methos does not work cross file systems
-        // because it is a protected method.
-        throw new IOException("Link resolution does not work with multiple " +
-            "file systems for listLocatedStatus(): " + p);
-      }
-    }.resolve(this, absF);
-  }
-
-
-  /**
-   * Returns a remote iterator so that followup calls are made on demand
-   * while consuming the entries. This reduces memory consumption during
-   * listing of a large directory.
-   *
-   * @param p target path
-   * @return remote iterator
-   */
-  @Override
-  public RemoteIterator<FileStatus> listStatusIterator(final Path p)
-  throws IOException {
-    Path absF = fixRelativePart(p);
-    return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
-      @Override
-      public RemoteIterator<FileStatus> doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return new DirListingIterator<FileStatus>(p, false);
-      }
-
-      @Override
-      public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
-          throws IOException {
-          return ((DistributedFileSystem)fs).listStatusIterator(p);
-      }
-    }.resolve(this, absF);
-
-  }
-
-  /**
-   * This class defines an iterator that returns
-   * the file status of each file/subdirectory of a directory
-   * 
-   * if needLocation, status contains block location if it is a file
-   * throws a RuntimeException with the error as its cause.
-   * 
-   * @param <T> the type of the file status
-   */
-  private class  DirListingIterator<T extends FileStatus>
-  implements RemoteIterator<T> {
-    private DirectoryListing thisListing;
-    private int i;
-    private Path p;
-    private String src;
-    private T curStat = null;
-    private PathFilter filter;
-    private boolean needLocation;
-
-    private DirListingIterator(Path p, PathFilter filter,
-        boolean needLocation) throws IOException {
-      this.p = p;
-      this.src = getPathName(p);
-      this.filter = filter;
-      this.needLocation = needLocation;
-      // fetch the first batch of entries in the directory
-      thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
-          needLocation);
-      statistics.incrementReadOps(1);
-      if (thisListing == null) { // the directory does not exist
-        throw new FileNotFoundException("File " + p + " does not exist.");
-      }
-      i = 0;
-    }
-
-    private DirListingIterator(Path p, boolean needLocation)
-        throws IOException {
-      this(p, null, needLocation);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public boolean hasNext() throws IOException {
-      while (curStat == null && hasNextNoFilter()) {
-        T next;
-        HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
-        if (needLocation) {
-          next = (T)((HdfsLocatedFileStatus)fileStat)
-              .makeQualifiedLocated(getUri(), p);
-        } else {
-          next = (T)fileStat.makeQualified(getUri(), p);
-        }
-          // apply filter if not null
-        if (filter == null || filter.accept(next.getPath())) {
-          curStat = next;
-        }
-      }
-      return curStat != null;
-    }
-      
-    /** Check if there is a next item before applying the given filter */
-    private boolean hasNextNoFilter() throws IOException {
-      if (thisListing == null) {
-        return false;
-      }
-      if (i >= thisListing.getPartialListing().length
-          && thisListing.hasMore()) { 
-        // current listing is exhausted & fetch a new listing
-        thisListing = dfs.listPaths(src, thisListing.getLastName(),
-            needLocation);
-        statistics.incrementReadOps(1);
-        if (thisListing == null) {
-          return false;
-        }
-        i = 0;
-      }
-      return (i < thisListing.getPartialListing().length);
-    }
-
-    @Override
-    public T next() throws IOException {
-      if (hasNext()) {
-        T tmp = curStat;
-        curStat = null;
-        return tmp;
-      } 
-      throw new java.util.NoSuchElementException("No more entry in " + p);
-    }
-  }
-  
-  /**
-   * Create a directory, only when the parent directories exist.
-   *
-   * See {@link FsPermission#applyUMask(FsPermission)} for details of how
-   * the permission is applied.
-   *
-   * @param f           The path to create
-   * @param permission  The permission.  See FsPermission#applyUMask for 
-   *                    details about how this is used to calculate the
-   *                    effective permission.
-   */
-  public boolean mkdir(Path f, FsPermission permission) throws IOException {
-    return mkdirsInternal(f, permission, false);
-  }
-
-  /**
-   * Create a directory and its parent directories.
-   *
-   * See {@link FsPermission#applyUMask(FsPermission)} for details of how
-   * the permission is applied.
-   *
-   * @param f           The path to create
-   * @param permission  The permission.  See FsPermission#applyUMask for 
-   *                    details about how this is used to calculate the
-   *                    effective permission.
-   */
-  @Override
-  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    return mkdirsInternal(f, permission, true);
-  }
-
-  private boolean mkdirsInternal(Path f, final FsPermission permission,
-      final boolean createParent) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<Boolean>() {
-      @Override
-      public Boolean doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.mkdirs(getPathName(p), permission, createParent);
-      }
-
-      @Override
-      public Boolean next(final FileSystem fs, final Path p)
-          throws IOException {
-        // FileSystem doesn't have a non-recursive mkdir() method
-        // Best we can do is error out
-        if (!createParent) {
-          throw new IOException("FileSystem does not support non-recursive"
-              + "mkdir");
-        }
-        return fs.mkdirs(p, permission);
-      }
-    }.resolve(this, absF);
-  }
-
-  @SuppressWarnings("deprecation")
-  @Override
-  protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
-    throws IOException {
-    statistics.incrementWriteOps(1);
-    return dfs.primitiveMkdir(getPathName(f), absolutePermission);
-  }
-
- 
-  @Override
-  public void close() throws IOException {
-    try {
-      dfs.closeOutputStreams(false);
-      super.close();
-    } finally {
-      dfs.close();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "DFS[" + dfs + "]";
-  }
-
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  public DFSClient getClient() {
-    return dfs;
-  }        
-  
-  @Override
-  public FsStatus getStatus(Path p) throws IOException {
-    statistics.incrementReadOps(1);
-    return dfs.getDiskStatus();
-  }
-
-  /**
-   * Returns count of blocks with no good replicas left. Normally should be
-   * zero.
-   * 
-   * @throws IOException
-   */
-  public long getMissingBlocksCount() throws IOException {
-    return dfs.getMissingBlocksCount();
-  }
-
-  /**
-   * Returns count of blocks with replication factor 1 and have
-   * lost the only replica.
-   *
-   * @throws IOException
-   */
-  public long getMissingReplOneBlocksCount() throws IOException {
-    return dfs.getMissingReplOneBlocksCount();
-  }
-
-  /**
-   * Returns count of blocks with one of more replica missing.
-   * 
-   * @throws IOException
-   */
-  public long getUnderReplicatedBlocksCount() throws IOException {
-    return dfs.getUnderReplicatedBlocksCount();
-  }
-
-  /**
-   * Returns count of blocks with at least one replica marked corrupt.
-   * 
-   * @throws IOException
-   */
-  public long getCorruptBlocksCount() throws IOException {
-    return dfs.getCorruptBlocksCount();
-  }
-
-  @Override
-  public RemoteIterator<Path> listCorruptFileBlocks(Path path)
-    throws IOException {
-    return new CorruptFileBlockIterator(dfs, path);
-  }
-
-  /** @return datanode statistics. */
-  public DatanodeInfo[] getDataNodeStats() throws IOException {
-    return getDataNodeStats(DatanodeReportType.ALL);
-  }
-
-  /** @return datanode statistics for the given type. */
-  public DatanodeInfo[] getDataNodeStats(final DatanodeReportType type
-      ) throws IOException {
-    return dfs.datanodeReport(type);
-  }
-
-  /**
-   * Enter, leave or get safe mode.
-   *  
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(
-   *    HdfsConstants.SafeModeAction,boolean)
-   */
-  public boolean setSafeMode(HdfsConstants.SafeModeAction action) 
-  throws IOException {
-    return setSafeMode(action, false);
-  }
-
-  /**
-   * Enter, leave or get safe mode.
-   * 
-   * @param action
-   *          One of SafeModeAction.ENTER, SafeModeAction.LEAVE and
-   *          SafeModeAction.GET
-   * @param isChecked
-   *          If true check only for Active NNs status, else check first NN's
-   *          status
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(SafeModeAction, boolean)
-   */
-  public boolean setSafeMode(HdfsConstants.SafeModeAction action,
-      boolean isChecked) throws IOException {
-    return dfs.setSafeMode(action, isChecked);
-  }
-
-  /**
-   * Save namespace image.
-   *
-   * @param timeWindow NameNode can ignore this command if the latest
-   *                   checkpoint was done within the given time period (in
-   *                   seconds).
-   * @return true if a new checkpoint has been made
-   * @see ClientProtocol#saveNamespace(long, long)
-   */
-  public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
-    return dfs.saveNamespace(timeWindow, txGap);
-  }
-
-  /**
-   * Save namespace image. NameNode always does the checkpoint.
-   */
-  public void saveNamespace() throws IOException {
-    saveNamespace(0, 0);
-  }
-
-  /**
-   * Rolls the edit log on the active NameNode.
-   * Requires super-user privileges.
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
-   * @return the transaction ID of the newly created segment
-   */
-  public long rollEdits() throws AccessControlException, IOException {
-    return dfs.rollEdits();
-  }
-
-  /**
-   * enable/disable/check restoreFaileStorage
-   * 
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String arg)
-   */
-  public boolean restoreFailedStorage(String arg)
-      throws AccessControlException, IOException {
-    return dfs.restoreFailedStorage(arg);
-  }
-  
-
-  /**
-   * Refreshes the list of hosts and excluded hosts from the configured 
-   * files.  
-   */
-  public void refreshNodes() throws IOException {
-    dfs.refreshNodes();
-  }
-
-  /**
-   * Finalize previously upgraded files system state.
-   * @throws IOException
-   */
-  public void finalizeUpgrade() throws IOException {
-    dfs.finalizeUpgrade();
-  }
-
-  /**
-   * Rolling upgrade: prepare/finalize/query.
-   */
-  public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
-      throws IOException {
-    return dfs.rollingUpgrade(action);
-  }
-
-  /*
-   * Requests the namenode to dump data strcutures into specified 
-   * file.
-   */
-  public void metaSave(String pathname) throws IOException {
-    dfs.metaSave(pathname);
-  }
-
-  @Override
-  public FsServerDefaults getServerDefaults() throws IOException {
-    return dfs.getServerDefaults();
-  }
-
-  /**
-   * Returns the stat information about the file.
-   * @throws FileNotFoundException if the file does not exist.
-   */
-  @Override
-  public FileStatus getFileStatus(Path f) throws IOException {
-    statistics.incrementReadOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<FileStatus>() {
-      @Override
-      public FileStatus doCall(final Path p) throws IOException,
-          UnresolvedLinkException {
-        HdfsFileStatus fi = dfs.getFileInfo(getPathName(p));
-        if (fi != null) {
-          return fi.makeQualified(getUri(), p);
-        } else {
-          throw new FileNotFoundException("File does not exist: " + p);
-        }
-      }
-      @Override
-      public FileStatus next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.getFileStatus(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  @SuppressWarnings("deprecation")
-  @Override
-  public void createSymlink(final Path target, final Path link,
-      final boolean createParent) throws AccessControlException,
-      FileAlreadyExistsException, FileNotFoundException,
-      ParentNotDirectoryException, UnsupportedFileSystemException, 
-      IOException {
-    if (!FileSystem.areSymlinksEnabled()) {
-      throw new UnsupportedOperationException("Symlinks not supported");
-    }
-    statistics.incrementWriteOps(1);
-    final Path absF = fixRelativePart(link);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException,
-          UnresolvedLinkException {
-        dfs.createSymlink(target.toString(), getPathName(p), createParent);
-        return null;
-      }
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException, UnresolvedLinkException {
-        fs.createSymlink(target, p, createParent);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public boolean supportsSymlinks() {
-    return true;
-  }
-
-  @Override
-  public FileStatus getFileLinkStatus(final Path f)
-      throws AccessControlException, FileNotFoundException,
-      UnsupportedFileSystemException, IOException {
-    statistics.incrementReadOps(1);
-    final Path absF = fixRelativePart(f);
-    FileStatus status = new FileSystemLinkResolver<FileStatus>() {
-      @Override
-      public FileStatus doCall(final Path p) throws IOException,
-          UnresolvedLinkException {
-        HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
-        if (fi != null) {
-          return fi.makeQualified(getUri(), p);
-        } else {
-          throw new FileNotFoundException("File does not exist: " + p);
-        }
-      }
-      @Override
-      public FileStatus next(final FileSystem fs, final Path p)
-        throws IOException, UnresolvedLinkException {
-        return fs.getFileLinkStatus(p);
-      }
-    }.resolve(this, absF);
-    // Fully-qualify the symlink
-    if (status.isSymlink()) {
-      Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
-          status.getPath(), status.getSymlink());
-      status.setSymlink(targetQual);
-    }
-    return status;
-  }
-
-  @Override
-  public Path getLinkTarget(final Path f) throws AccessControlException,
-      FileNotFoundException, UnsupportedFileSystemException, IOException {
-    statistics.incrementReadOps(1);
-    final Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<Path>() {
-      @Override
-      public Path doCall(final Path p) throws IOException,
-          UnresolvedLinkException {
-        HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
-        if (fi != null) {
-          return fi.makeQualified(getUri(), p).getSymlink();
-        } else {
-          throw new FileNotFoundException("File does not exist: " + p);
-        }
-      }
-      @Override
-      public Path next(final FileSystem fs, final Path p)
-        throws IOException, UnresolvedLinkException {
-        return fs.getLinkTarget(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  protected Path resolveLink(Path f) throws IOException {
-    statistics.incrementReadOps(1);
-    String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
-    if (target == null) {
-      throw new FileNotFoundException("File does not exist: " + f.toString());
-    }
-    return new Path(target);
-  }
-
-  @Override
-  public FileChecksum getFileChecksum(Path f) throws IOException {
-    statistics.incrementReadOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<FileChecksum>() {
-      @Override
-      public FileChecksum doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
-      }
-
-      @Override
-      public FileChecksum next(final FileSystem fs, final Path p)
-          throws IOException {
-        return fs.getFileChecksum(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public FileChecksum getFileChecksum(Path f, final long length)
-      throws IOException {
-    statistics.incrementReadOps(1);
-    Path absF = fixRelativePart(f);
-    return new FileSystemLinkResolver<FileChecksum>() {
-      @Override
-      public FileChecksum doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.getFileChecksum(getPathName(p), length);
-      }
-
-      @Override
-      public FileChecksum next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          return ((DistributedFileSystem) fs).getFileChecksum(p, length);
-        } else {
-          throw new UnsupportedFileSystemException(
-              "getFileChecksum(Path, long) is not supported by "
-                  + fs.getClass().getSimpleName()); 
-        }
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public void setPermission(Path p, final FsPermission permission
-      ) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(p);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.setPermission(getPathName(p), permission);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        fs.setPermission(p, permission);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public void setOwner(Path p, final String username, final String groupname
-      ) throws IOException {
-    if (username == null && groupname == null) {
-      throw new IOException("username == null && groupname == null");
-    }
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(p);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.setOwner(getPathName(p), username, groupname);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        fs.setOwner(p, username, groupname);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public void setTimes(Path p, final long mtime, final long atime
-      ) throws IOException {
-    statistics.incrementWriteOps(1);
-    Path absF = fixRelativePart(p);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.setTimes(getPathName(p), mtime, atime);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        fs.setTimes(p, mtime, atime);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-  
-
-  @Override
-  protected int getDefaultPort() {
-    return HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
-  }
-
-  @Override
-  public Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
-      throws IOException {
-    Token<DelegationTokenIdentifier> result =
-      dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
-    return result;
-  }
-
-  /**
-   * Requests the namenode to tell all datanodes to use a new, non-persistent
-   * bandwidth value for dfs.balance.bandwidthPerSec.
-   * The bandwidth parameter is the max number of bytes per second of network
-   * bandwidth to be used by a datanode during balancing.
-   *
-   * @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
-   * @throws IOException
-   */
-  public void setBalancerBandwidth(long bandwidth) throws IOException {
-    dfs.setBalancerBandwidth(bandwidth);
-  }
-
-  /**
-   * Get a canonical service name for this file system. If the URI is logical,
-   * the hostname part of the URI will be returned.
-   * @return a service string that uniquely identifies this file system.
-   */
-  @Override
-  public String getCanonicalServiceName() {
-    return dfs.getCanonicalServiceName();
-  }
-  
-  @Override
-  protected URI canonicalizeUri(URI uri) {
-    if (HAUtilClient.isLogicalUri(getConf(), uri)) {
-      // Don't try to DNS-resolve logical URIs, since the 'authority'
-      // portion isn't a proper hostname
-      return uri;
-    } else {
-      return NetUtils.getCanonicalUri(uri, getDefaultPort());
-    }
-  }
-
-  /**
-   * Utility function that returns if the NameNode is in safemode or not. In HA
-   * mode, this API will return only ActiveNN's safemode status.
-   * 
-   * @return true if NameNode is in safemode, false otherwise.
-   * @throws IOException
-   *           when there is an issue communicating with the NameNode
-   */
-  public boolean isInSafeMode() throws IOException {
-    return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
-  }
-
-  /** @see HdfsAdmin#allowSnapshot(Path) */
-  public void allowSnapshot(final Path path) throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.allowSnapshot(getPathName(p));
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          myDfs.allowSnapshot(p);
-        } else {
-          throw new UnsupportedOperationException("Cannot perform snapshot"
-              + " operations on a symlink to a non-DistributedFileSystem: "
-              + path + " -> " + p);
-        }
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-  
-  /** @see HdfsAdmin#disallowSnapshot(Path) */
-  public void disallowSnapshot(final Path path) throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.disallowSnapshot(getPathName(p));
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          myDfs.disallowSnapshot(p);
-        } else {
-          throw new UnsupportedOperationException("Cannot perform snapshot"
-              + " operations on a symlink to a non-DistributedFileSystem: "
-              + path + " -> " + p);
-        }
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-  
-  @Override
-  public Path createSnapshot(final Path path, final String snapshotName) 
-      throws IOException {
-    Path absF = fixRelativePart(path);
-    return new FileSystemLinkResolver<Path>() {
-      @Override
-      public Path doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return new Path(dfs.createSnapshot(getPathName(p), snapshotName));
-      }
-
-      @Override
-      public Path next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          return myDfs.createSnapshot(p);
-        } else {
-          throw new UnsupportedOperationException("Cannot perform snapshot"
-              + " operations on a symlink to a non-DistributedFileSystem: "
-              + path + " -> " + p);
-        }
-      }
-    }.resolve(this, absF);
-  }
-  
-  @Override
-  public void renameSnapshot(final Path path, final String snapshotOldName,
-      final String snapshotNewName) throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.renameSnapshot(getPathName(p), snapshotOldName, snapshotNewName);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          myDfs.renameSnapshot(p, snapshotOldName, snapshotNewName);
-        } else {
-          throw new UnsupportedOperationException("Cannot perform snapshot"
-              + " operations on a symlink to a non-DistributedFileSystem: "
-              + path + " -> " + p);
-        }
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-  
-  /**
-   * @return All the snapshottable directories
-   * @throws IOException
-   */
-  public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
-      throws IOException {
-    return dfs.getSnapshottableDirListing();
-  }
-  
-  @Override
-  public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
-      throws IOException {
-    Path absF = fixRelativePart(snapshotDir);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        dfs.deleteSnapshot(getPathName(p), snapshotName);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          myDfs.deleteSnapshot(p, snapshotName);
-        } else {
-          throw new UnsupportedOperationException("Cannot perform snapshot"
-              + " operations on a symlink to a non-DistributedFileSystem: "
-              + snapshotDir + " -> " + p);
-        }
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * Get the difference between two snapshots, or between a snapshot and the
-   * current tree of a directory.
-   * 
-   * @see DFSClient#getSnapshotDiffReport(String, String, String)
-   */
-  public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
-      final String fromSnapshot, final String toSnapshot) throws IOException {
-    Path absF = fixRelativePart(snapshotDir);
-    return new FileSystemLinkResolver<SnapshotDiffReport>() {
-      @Override
-      public SnapshotDiffReport doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot,
-            toSnapshot);
-      }
-
-      @Override
-      public SnapshotDiffReport next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          myDfs.getSnapshotDiffReport(p, fromSnapshot, toSnapshot);
-        } else {
-          throw new UnsupportedOperationException("Cannot perform snapshot"
-              + " operations on a symlink to a non-DistributedFileSystem: "
-              + snapshotDir + " -> " + p);
-        }
-        return null;
-      }
-    }.resolve(this, absF);
-  }
- 
-  /**
-   * Get the close status of a file
-   * @param src The path to the file
-   *
-   * @return return true if file is closed
-   * @throws FileNotFoundException if the file does not exist.
-   * @throws IOException If an I/O error occurred     
-   */
-  public boolean isFileClosed(final Path src) throws IOException {
-    Path absF = fixRelativePart(src);
-    return new FileSystemLinkResolver<Boolean>() {
-      @Override
-      public Boolean doCall(final Path p)
-          throws IOException, UnresolvedLinkException {
-        return dfs.isFileClosed(getPathName(p));
-      }
-
-      @Override
-      public Boolean next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem)fs;
-          return myDfs.isFileClosed(p);
-        } else {
-          throw new UnsupportedOperationException("Cannot call isFileClosed"
-              + " on a symlink to a non-DistributedFileSystem: "
-              + src + " -> " + p);
-        }
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * @see {@link #addCacheDirective(CacheDirectiveInfo, EnumSet)}
-   */
-  public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
-    return addCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
-  }
-
-  /**
-   * Add a new CacheDirective.
-   * 
-   * @param info Information about a directive to add.
-   * @param flags {@link CacheFlag}s to use for this operation.
-   * @return the ID of the directive that was created.
-   * @throws IOException if the directive could not be added
-   */
-  public long addCacheDirective(
-      CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
-    Preconditions.checkNotNull(info.getPath());
-    Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
-        makeQualified(getUri(), getWorkingDirectory());
-    return dfs.addCacheDirective(
-        new CacheDirectiveInfo.Builder(info).
-            setPath(path).
-            build(),
-        flags);
-  }
-
-  /**
-   * @see {@link #modifyCacheDirective(CacheDirectiveInfo, EnumSet)}
-   */
-  public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
-    modifyCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
-  }
-
-  /**
-   * Modify a CacheDirective.
-   * 
-   * @param info Information about the directive to modify. You must set the ID
-   *          to indicate which CacheDirective you want to modify.
-   * @param flags {@link CacheFlag}s to use for this operation.
-   * @throws IOException if the directive could not be modified
-   */
-  public void modifyCacheDirective(
-      CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
-    if (info.getPath() != null) {
-      info = new CacheDirectiveInfo.Builder(info).
-          setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
-              makeQualified(getUri(), getWorkingDirectory())).build();
-    }
-    dfs.modifyCacheDirective(info, flags);
-  }
-
-  /**
-   * Remove a CacheDirectiveInfo.
-   * 
-   * @param id identifier of the CacheDirectiveInfo to remove
-   * @throws IOException if the directive could not be removed
-   */
-  public void removeCacheDirective(long id)
-      throws IOException {
-    dfs.removeCacheDirective(id);
-  }
-  
-  /**
-   * List cache directives.  Incrementally fetches results from the server.
-   * 
-   * @param filter Filter parameters to use when listing the directives, null to
-   *               list all directives visible to us.
-   * @return A RemoteIterator which returns CacheDirectiveInfo objects.
-   */
-  public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
-      CacheDirectiveInfo filter) throws IOException {
-    if (filter == null) {
-      filter = new CacheDirectiveInfo.Builder().build();
-    }
-    if (filter.getPath() != null) {
-      filter = new CacheDirectiveInfo.Builder(filter).
-          setPath(new Path(getPathName(fixRelativePart(filter.getPath())))).
-          build();
-    }
-    final RemoteIterator<CacheDirectiveEntry> iter =
-        dfs.listCacheDirectives(filter);
-    return new RemoteIterator<CacheDirectiveEntry>() {
-      @Override
-      public boolean hasNext() throws IOException {
-        return iter.hasNext();
-      }
-
-      @Override
-      public CacheDirectiveEntry next() throws IOException {
-        // Although the paths we get back from the NameNode should always be
-        // absolute, we call makeQualified to add the scheme and authority of
-        // this DistributedFilesystem.
-        CacheDirectiveEntry desc = iter.next();
-        CacheDirectiveInfo info = desc.getInfo();
-        Path p = info.getPath().makeQualified(getUri(), getWorkingDirectory());
-        return new CacheDirectiveEntry(
-            new CacheDirectiveInfo.Builder(info).setPath(p).build(),
-            desc.getStats());
-      }
-    };
-  }
-
-  /**
-   * Add a cache pool.
-   *
-   * @param info
-   *          The request to add a cache pool.
-   * @throws IOException 
-   *          If the request could not be completed.
-   */
-  public void addCachePool(CachePoolInfo info) throws IOException {
-    CachePoolInfo.validate(info);
-    dfs.addCachePool(info);
-  }
-
-  /**
-   * Modify an existing cache pool.
-   *
-   * @param info
-   *          The request to modify a cache pool.
-   * @throws IOException 
-   *          If the request could not be completed.
-   */
-  public void modifyCachePool(CachePoolInfo info) throws IOException {
-    CachePoolInfo.validate(info);
-    dfs.modifyCachePool(info);
-  }
-    
-  /**
-   * Remove a cache pool.
-   *
-   * @param poolName
-   *          Name of the cache pool to remove.
-   * @throws IOException 
-   *          if the cache pool did not exist, or could not be removed.
-   */
-  public void removeCachePool(String poolName) throws IOException {
-    CachePoolInfo.validateName(poolName);
-    dfs.removeCachePool(poolName);
-  }
-
-  /**
-   * List all cache pools.
-   *
-   * @return A remote iterator from which you can get CachePoolEntry objects.
-   *          Requests will be made as needed.
-   * @throws IOException
-   *          If there was an error listing cache pools.
-   */
-  public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
-    return dfs.listCachePools();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
-      throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException {
-        dfs.modifyAclEntries(getPathName(p), aclSpec);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p) throws IOException {
-        fs.modifyAclEntries(p, aclSpec);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
-      throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException {
-        dfs.removeAclEntries(getPathName(p), aclSpec);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p) throws IOException {
-        fs.removeAclEntries(p, aclSpec);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void removeDefaultAcl(Path path) throws IOException {
-    final Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException {
-        dfs.removeDefaultAcl(getPathName(p));
-        return null;
-      }
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-        throws IOException, UnresolvedLinkException {
-        fs.removeDefaultAcl(p);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void removeAcl(Path path) throws IOException {
-    final Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException {
-        dfs.removeAcl(getPathName(p));
-        return null;
-      }
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-        throws IOException, UnresolvedLinkException {
-        fs.removeAcl(p);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void setAcl(Path path, final List<AclEntry> aclSpec) throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException {
-        dfs.setAcl(getPathName(p), aclSpec);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p) throws IOException {
-        fs.setAcl(p, aclSpec);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public AclStatus getAclStatus(Path path) throws IOException {
-    final Path absF = fixRelativePart(path);
-    return new FileSystemLinkResolver<AclStatus>() {
-      @Override
-      public AclStatus doCall(final Path p) throws IOException {
-        return dfs.getAclStatus(getPathName(p));
-      }
-      @Override
-      public AclStatus next(final FileSystem fs, final Path p)
-        throws IOException, UnresolvedLinkException {
-        return fs.getAclStatus(p);
-      }
-    }.resolve(this, absF);
-  }
-  
-  /* HDFS only */
-  public void createEncryptionZone(final Path path, final String keyName)
-    throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException,
-          UnresolvedLinkException {
-        dfs.createEncryptionZone(getPathName(p), keyName);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p) throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem) fs;
-          myDfs.createEncryptionZone(p, keyName);
-          return null;
-        } else {
-          throw new UnsupportedOperationException(
-              "Cannot call createEncryptionZone"
-                  + " on a symlink to a non-DistributedFileSystem: " + path
-                  + " -> " + p);
-        }
-      }
-    }.resolve(this, absF);
-  }
-
-  /* HDFS only */
-  public EncryptionZone getEZForPath(final Path path)
-          throws IOException {
-    Preconditions.checkNotNull(path);
-    Path absF = fixRelativePart(path);
-    return new FileSystemLinkResolver<EncryptionZone>() {
-      @Override
-      public EncryptionZone doCall(final Path p) throws IOException,
-          UnresolvedLinkException {
-        return dfs.getEZForPath(getPathName(p));
-      }
-
-      @Override
-      public EncryptionZone next(final FileSystem fs, final Path p)
-          throws IOException {
-        if (fs instanceof DistributedFileSystem) {
-          DistributedFileSystem myDfs = (DistributedFileSystem) fs;
-          return myDfs.getEZForPath(p);
-        } else {
-          throw new UnsupportedOperationException(
-              "Cannot call getEZForPath"
-                  + " on a symlink to a non-DistributedFileSystem: " + path
-                  + " -> " + p);
-        }
-      }
-    }.resolve(this, absF);
-  }
-
-  /* HDFS only */
-  public RemoteIterator<EncryptionZone> listEncryptionZones()
-      throws IOException {
-    return dfs.listEncryptionZones();
-  }
-
-  @Override
-  public void setXAttr(Path path, final String name, final byte[] value, 
-      final EnumSet<XAttrSetFlag> flag) throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-
-      @Override
-      public Void doCall(final Path p) throws IOException {
-        dfs.setXAttr(getPathName(p), name, value, flag);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p) throws IOException {
-        fs.setXAttr(p, name, value, flag);
-        return null;
-      }      
-    }.resolve(this, absF);
-  }
-  
-  @Override
-  public byte[] getXAttr(Path path, final String name) throws IOException {
-    final Path absF = fixRelativePart(path);
-    return new FileSystemLinkResolver<byte[]>() {
-      @Override
-      public byte[] doCall(final Path p) throws IOException {
-        return dfs.getXAttr(getPathName(p), name);
-      }
-      @Override
-      public byte[] next(final FileSystem fs, final Path p)
-        throws IOException, UnresolvedLinkException {
-        return fs.getXAttr(p, name);
-      }
-    }.resolve(this, absF);
-  }
-  
-  @Override
-  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
-    final Path absF = fixRelativePart(path);
-    return new FileSystemLinkResolver<Map<String, byte[]>>() {
-      @Override
-      public Map<String, byte[]> doCall(final Path p) throws IOException {
-        return dfs.getXAttrs(getPathName(p));
-      }
-      @Override
-      public Map<String, byte[]> next(final FileSystem fs, final Path p)
-        throws IOException, UnresolvedLinkException {
-        return fs.getXAttrs(p);
-      }
-    }.resolve(this, absF);
-  }
-  
-  @Override
-  public Map<String, byte[]> getXAttrs(Path path, final List<String> names) 
-      throws IOException {
-    final Path absF = fixRelativePart(path);
-    return new FileSystemLinkResolver<Map<String, byte[]>>() {
-      @Override
-      public Map<String, byte[]> doCall(final Path p) throws IOException {
-        return dfs.getXAttrs(getPathName(p), names);
-      }
-      @Override
-      public Map<String, byte[]> next(final FileSystem fs, final Path p)
-        throws IOException, UnresolvedLinkException {
-        return fs.getXAttrs(p, names);
-      }
-    }.resolve(this, absF);
-  }
-  
-  @Override
-  public List<String> listXAttrs(Path path)
-          throws IOException {
-    final Path absF = fixRelativePart(path);
-    return new FileSystemLinkResolver<List<String>>() {
-      @Override
-      public List<String> doCall(final Path p) throws IOException {
-        return dfs.listXAttrs(getPathName(p));
-      }
-      @Override
-      public List<String> next(final FileSystem fs, final Path p)
-              throws IOException, UnresolvedLinkException {
-        return fs.listXAttrs(p);
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public void removeXAttr(Path path, final String name) throws IOException {
-    Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException {
-        dfs.removeXAttr(getPathName(p), name);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p) throws IOException {
-        fs.removeXAttr(p, name);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public void access(Path path, final FsAction mode) throws IOException {
-    final Path absF = fixRelativePart(path);
-    new FileSystemLinkResolver<Void>() {
-      @Override
-      public Void doCall(final Path p) throws IOException {
-        dfs.checkAccess(getPathName(p), mode);
-        return null;
-      }
-
-      @Override
-      public Void next(final FileSystem fs, final Path p)
-          throws IOException {
-        fs.access(p, mode);
-        return null;
-      }
-    }.resolve(this, absF);
-  }
-
-  @Override
-  public Token<?>[] addDelegationTokens(
-      final String renewer, Credentials credentials) throws IOException {
-    Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
-    if (dfs.isHDFSEncryptionEnabled()) {
-      KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
-          KeyProviderDelegationTokenExtension.
-              createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
-      Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
-          addDelegationTokens(renewer, credentials);
-      if (tokens != null && kpTokens != null) {
-        Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
-        System.arraycopy(tokens, 0, all, 0, tokens.length);
-        System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
-        tokens = all;
-      } else {
-        tokens = (tokens != null) ? tokens : kpTokens;
-      }
-    }
-    return tokens;
-  }
-
-  public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
-    return dfs.getInotifyEventStream();
-  }
-
-  public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
-      throws IOException {
-    return dfs.getInotifyEventStream(lastReadTxid);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
deleted file mode 100644
index 77bed1a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.client.impl;
-
-import java.io.IOException;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
-/**
- * Provides an iterator interface for listCorruptFileBlocks.
- * This class is used by DistributedFileSystem and Hdfs.
- */
-@InterfaceAudience.Private
-public class CorruptFileBlockIterator implements RemoteIterator<Path> {
-  private final DFSClient dfs;
-  private final String path;
-
-  private String[] files = null;
-  private int fileIdx = 0;
-  private String cookie = null;
-  private Path nextPath = null;
-
-  private int callsMade = 0;
-
-  public CorruptFileBlockIterator(DFSClient dfs, Path path) throws IOException {
-    this.dfs = dfs;
-    this.path = path2String(path);
-    loadNext();
-  }
-
-  /**
-   * @return the number of calls made to the DFSClient.
-   * This is for debugging and testing purposes.
-   */
-  public int getCallsMade() {
-    return callsMade;
-  }
-
-  private String path2String(Path path) {
-    return path.toUri().getPath();
-  }
-
-  private Path string2Path(String string) {
-    return new Path(string);
-  }
-
-  private void loadNext() throws IOException {
-    if (files == null || fileIdx >= files.length) {
-      CorruptFileBlocks cfb = dfs.listCorruptFileBlocks(path, cookie);
-      files = cfb.getFiles();
-      cookie = cfb.getCookie();
-      fileIdx = 0;
-      callsMade++;
-    }
-
-    if (fileIdx >= files.length) {
-      // received an empty response
-      // there are no more corrupt file blocks
-      nextPath = null;
-    } else {
-      nextPath = string2Path(files[fileIdx]);
-      fileIdx++;
-    }
-  }
-
-  
-  @Override
-  public boolean hasNext() {
-    return nextPath != null;
-  }
-
-  
-  @Override
-  public Path next() throws IOException {
-    if (!hasNext()) {
-      throw new NoSuchElementException("No more corrupt file blocks");
-    }
-
-    Path result = nextPath;
-    loadNext();
-
-    return result;
-  }
-}
\ No newline at end of file