You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/29 22:30:30 UTC
[28/50] [abbrv] hadoop git commit: HDFS-8740. Move
DistributedFileSystem to hadoop-hdfs-client. Contributed by Mingliang Liu.
HDFS-8740. Move DistributedFileSystem to hadoop-hdfs-client. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1c030c6e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1c030c6e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1c030c6e
Branch: refs/heads/HDFS-7240
Commit: 1c030c6e58dc83152f933323bb7743ad47f5af27
Parents: f0f984e
Author: Haohui Mai <wh...@apache.org>
Authored: Sun Sep 27 10:54:44 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Sun Sep 27 10:54:44 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/DistributedFileSystem.java | 2260 +++++++++++++++++
.../client/impl/CorruptFileBlockIterator.java | 105 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/DistributedFileSystem.java | 2262 ------------------
.../client/impl/CorruptFileBlockIterator.java | 105 -
5 files changed, 2368 insertions(+), 2367 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
new file mode 100644
index 0000000..88e6637
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -0,0 +1,2260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStoragePolicySpi;
+import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSLinkResolver;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/****************************************************************
+ * Implementation of the abstract FileSystem for the DFS system.
+ * This object is the way end-user code interacts with a Hadoop
+ * DistributedFileSystem.
+ *
+ *****************************************************************/
+@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
+@InterfaceStability.Unstable
+public class DistributedFileSystem extends FileSystem {
+ private Path workingDir;
+ private URI uri;
+ private String homeDirPrefix =
+ HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;
+
+ DFSClient dfs;
+ private boolean verifyChecksum = true;
+
+ static{
+ HdfsConfigurationLoader.init();
+ }
+
+ public DistributedFileSystem() {
+ }
+
+ /**
+ * Return the protocol scheme for the FileSystem.
+ * <p/>
+ *
+ * @return <code>hdfs</code>
+ */
+ @Override
+ public String getScheme() {
+ return HdfsConstants.HDFS_URI_SCHEME;
+ }
+
+ @Override
+ public URI getUri() { return uri; }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+ setConf(conf);
+
+ String host = uri.getHost();
+ if (host == null) {
+ throw new IOException("Incomplete HDFS URI, no host: "+ uri);
+ }
+ homeDirPrefix = conf.get(
+ HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
+ HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
+
+ this.dfs = new DFSClient(uri, conf, statistics);
+ this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
+ this.workingDir = getHomeDirectory();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return dfs.getConf().getDefaultBlockSize();
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return dfs.getConf().getDefaultReplication();
+ }
+
+ @Override
+ public void setWorkingDirectory(Path dir) {
+ String result = fixRelativePart(dir).toUri().getPath();
+ if (!DFSUtilClient.isValidName(result)) {
+ throw new IllegalArgumentException("Invalid DFS directory name " +
+ result);
+ }
+ workingDir = fixRelativePart(dir);
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ return makeQualified(new Path(homeDirPrefix + "/"
+ + dfs.ugi.getShortUserName()));
+ }
+
+ /**
+ * Checks that the passed URI belongs to this filesystem and returns
+ * just the path component. Expects a URI with an absolute path.
+ *
+ * @param file URI with absolute path
+ * @return path component of {file}
+ * @throws IllegalArgumentException if URI does not belong to this DFS
+ */
+ private String getPathName(Path file) {
+ checkPath(file);
+ String result = file.toUri().getPath();
+ if (!DFSUtilClient.isValidName(result)) {
+ throw new IllegalArgumentException("Pathname " + result + " from " +
+ file+" is not a valid DFS filename.");
+ }
+ return result;
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+ long len) throws IOException {
+ if (file == null) {
+ return null;
+ }
+ return getFileBlockLocations(file.getPath(), start, len);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path p,
+ final long start, final long len) throws IOException {
+ statistics.incrementReadOps(1);
+ final Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<BlockLocation[]>() {
+ @Override
+ public BlockLocation[] doCall(final Path p) throws IOException {
+ return dfs.getBlockLocations(getPathName(p), start, len);
+ }
+ @Override
+ public BlockLocation[] next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.getFileBlockLocations(p, start, len);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void setVerifyChecksum(boolean verifyChecksum) {
+ this.verifyChecksum = verifyChecksum;
+ }
+
+ /**
+ * Start the lease recovery of a file
+ *
+ * @param f a file
+ * @return true if the file is already closed
+ * @throws IOException if an error occurs
+ */
+ public boolean recoverLease(final Path f) throws IOException {
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.recoverLease(getPathName(p));
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.recoverLease(p);
+ }
+ throw new UnsupportedOperationException("Cannot recoverLease through" +
+ " a symlink to a non-DistributedFileSystem: " + f + " -> " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, final int bufferSize)
+ throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataInputStream>() {
+ @Override
+ public FSDataInputStream doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ final DFSInputStream dfsis =
+ dfs.open(getPathName(p), bufferSize, verifyChecksum);
+ return dfs.createWrappedInputStream(dfsis);
+ }
+ @Override
+ public FSDataInputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.open(p, bufferSize);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, final int bufferSize,
+ final Progressable progress) throws IOException {
+ return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ *
+ * @param f the existing file to be appended.
+ * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
+ * to be present.
+ * @param bufferSize the size of the buffer to be used.
+ * @param progress for reporting progress if it is not null.
+ * @return Returns instance of {@link FSDataOutputStream}
+ * @throws IOException
+ */
+ public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataOutputStream>() {
+ @Override
+ public FSDataOutputStream doCall(final Path p)
+ throws IOException {
+ return dfs.append(getPathName(p), bufferSize, flag, progress,
+ statistics);
+ }
+ @Override
+ public FSDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.append(p, bufferSize);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ *
+ * @param f the existing file to be appended.
+ * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
+ * to be present.
+ * @param bufferSize the size of the buffer to be used.
+ * @param progress for reporting progress if it is not null.
+ * @param favoredNodes Favored nodes for new blocks
+ * @return Returns instance of {@link FSDataOutputStream}
+ * @throws IOException
+ */
+ public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final Progressable progress,
+ final InetSocketAddress[] favoredNodes) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataOutputStream>() {
+ @Override
+ public FSDataOutputStream doCall(final Path p)
+ throws IOException {
+ return dfs.append(getPathName(p), bufferSize, flag, progress,
+ statistics, favoredNodes);
+ }
+ @Override
+ public FSDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.append(p, bufferSize);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return this.create(f, permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+ blockSize, progress, null);
+ }
+
+ /**
+ * Same as
+ * {@link #create(Path, FsPermission, boolean, int, short, long,
+ * Progressable)} with the addition of favoredNodes that is a hint to
+ * where the namenode should place the file blocks.
+ * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+ * at the creation time only. And with favored nodes, blocks will be pinned
+ * on the datanodes to prevent balancing move the block. HDFS could move the
+ * blocks during replication, to move the blocks from favored nodes. A value
+ * of null means no favored nodes for this create
+ */
+ public HdfsDataOutputStream create(final Path f,
+ final FsPermission permission, final boolean overwrite,
+ final int bufferSize, final short replication, final long blockSize,
+ final Progressable progress, final InetSocketAddress[] favoredNodes)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<HdfsDataOutputStream>() {
+ @Override
+ public HdfsDataOutputStream doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ final DFSOutputStream out = dfs.create(getPathName(f), permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE),
+ true, replication, blockSize, progress, bufferSize, null,
+ favoredNodes);
+ return dfs.createWrappedOutputStream(out, statistics);
+ }
+ @Override
+ public HdfsDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.create(p, permission, overwrite, bufferSize, replication,
+ blockSize, progress, favoredNodes);
+ }
+ throw new UnsupportedOperationException("Cannot create with" +
+ " favoredNodes through a symlink to a non-DistributedFileSystem: "
+ + f + " -> " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FSDataOutputStream create(final Path f, final FsPermission permission,
+ final EnumSet<CreateFlag> cflags, final int bufferSize,
+ final short replication, final long blockSize, final Progressable progress,
+ final ChecksumOpt checksumOpt) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataOutputStream>() {
+ @Override
+ public FSDataOutputStream doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
+ cflags, replication, blockSize, progress, bufferSize,
+ checksumOpt);
+ return dfs.createWrappedOutputStream(dfsos, statistics);
+ }
+ @Override
+ public FSDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.create(p, permission, cflags, bufferSize,
+ replication, blockSize, progress, checksumOpt);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ protected HdfsDataOutputStream primitiveCreate(Path f,
+ FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
+ short replication, long blockSize, Progressable progress,
+ ChecksumOpt checksumOpt) throws IOException {
+ statistics.incrementWriteOps(1);
+ final DFSOutputStream dfsos = dfs.primitiveCreate(
+ getPathName(fixRelativePart(f)),
+ absolutePermission, flag, true, replication, blockSize,
+ progress, bufferSize, checksumOpt);
+ return dfs.createWrappedOutputStream(dfsos, statistics);
+ }
+
+ /**
+ * Same as create(), except fails if parent directory doesn't already exist.
+ */
+ @Override
+ @SuppressWarnings("deprecation")
+ public FSDataOutputStream createNonRecursive(final Path f,
+ final FsPermission permission, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final short replication, final long blockSize,
+ final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+ if (flag.contains(CreateFlag.OVERWRITE)) {
+ flag.add(CreateFlag.CREATE);
+ }
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataOutputStream>() {
+ @Override
+ public FSDataOutputStream doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
+ flag, false, replication, blockSize, progress, bufferSize, null);
+ return dfs.createWrappedOutputStream(dfsos, statistics);
+ }
+
+ @Override
+ public FSDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.createNonRecursive(p, permission, flag, bufferSize,
+ replication, blockSize, progress);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public boolean setReplication(Path src,
+ final short replication
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(src);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.setReplication(getPathName(p), replication);
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.setReplication(p, replication);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Set the source path to the specified storage policy.
+ *
+ * @param src The source path referring to either a directory or a file.
+ * @param policyName The name of the storage policy.
+ */
+ @Override
+ public void setStoragePolicy(final Path src, final String policyName)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(src);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setStoragePolicy(getPathName(p), policyName);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.setStoragePolicy(p, policyName);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(path);
+
+ return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
+ @Override
+ public BlockStoragePolicySpi doCall(final Path p) throws IOException {
+ return getClient().getStoragePolicy(getPathName(p));
+ }
+
+ @Override
+ public BlockStoragePolicySpi next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getStoragePolicy(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Collection<BlockStoragePolicy> getAllStoragePolicies()
+ throws IOException {
+ return Arrays.asList(dfs.getStoragePolicies());
+ }
+
+ /**
+ * Deprecated. Prefer {@link FileSystem#getAllStoragePolicies()}
+ * @return
+ * @throws IOException
+ */
+ @Deprecated
+ public BlockStoragePolicy[] getStoragePolicies() throws IOException {
+ statistics.incrementReadOps(1);
+ return dfs.getStoragePolicies();
+ }
+
+ /**
+ * Move blocks from srcs to trg and delete srcs afterwards.
+ * The file block sizes must be the same.
+ *
+ * @param trg existing file to append to
+ * @param psrcs list of files (same block size, same replication)
+ * @throws IOException
+ */
+ @Override
+ public void concat(Path trg, Path [] psrcs) throws IOException {
+ statistics.incrementWriteOps(1);
+ // Make target absolute
+ Path absF = fixRelativePart(trg);
+ // Make all srcs absolute
+ Path[] srcs = new Path[psrcs.length];
+ for (int i=0; i<psrcs.length; i++) {
+ srcs[i] = fixRelativePart(psrcs[i]);
+ }
+ // Try the concat without resolving any links
+ String[] srcsStr = new String[psrcs.length];
+ try {
+ for (int i=0; i<psrcs.length; i++) {
+ srcsStr[i] = getPathName(srcs[i]);
+ }
+ dfs.concat(getPathName(trg), srcsStr);
+ } catch (UnresolvedLinkException e) {
+ // Exception could be from trg or any src.
+ // Fully resolve trg and srcs. Fail if any of them are a symlink.
+ FileStatus stat = getFileLinkStatus(absF);
+ if (stat.isSymlink()) {
+ throw new IOException("Cannot concat with a symlink target: "
+ + trg + " -> " + stat.getPath());
+ }
+ absF = fixRelativePart(stat.getPath());
+ for (int i=0; i<psrcs.length; i++) {
+ stat = getFileLinkStatus(srcs[i]);
+ if (stat.isSymlink()) {
+ throw new IOException("Cannot concat with a symlink src: "
+ + psrcs[i] + " -> " + stat.getPath());
+ }
+ srcs[i] = fixRelativePart(stat.getPath());
+ }
+ // Try concat again. Can still race with another symlink.
+ for (int i=0; i<psrcs.length; i++) {
+ srcsStr[i] = getPathName(srcs[i]);
+ }
+ dfs.concat(getPathName(absF), srcsStr);
+ }
+ }
+
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ final Path absSrc = fixRelativePart(src);
+ final Path absDst = fixRelativePart(dst);
+
+ // Try the rename without resolving first
+ try {
+ return dfs.rename(getPathName(absSrc), getPathName(absDst));
+ } catch (UnresolvedLinkException e) {
+ // Fully resolve the source
+ final Path source = getFileLinkStatus(absSrc).getPath();
+ // Keep trying to resolve the destination
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.rename(getPathName(source), getPathName(p));
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ // Should just throw an error in FileSystem#checkPath
+ return doCall(p);
+ }
+ }.resolve(this, absDst);
+ }
+ }
+
+ /**
+ * This rename operation is guaranteed to be atomic.
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ public void rename(Path src, Path dst, final Options.Rename... options)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final Path absSrc = fixRelativePart(src);
+ final Path absDst = fixRelativePart(dst);
+ // Try the rename without resolving first
+ try {
+ dfs.rename(getPathName(absSrc), getPathName(absDst), options);
+ } catch (UnresolvedLinkException e) {
+ // Fully resolve the source
+ final Path source = getFileLinkStatus(absSrc).getPath();
+ // Keep trying to resolve the destination
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.rename(getPathName(source), getPathName(p), options);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ // Should just throw an error in FileSystem#checkPath
+ return doCall(p);
+ }
+ }.resolve(this, absDst);
+ }
+ }
+
+ @Override
+ public boolean truncate(Path f, final long newLength) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.truncate(getPathName(p), newLength);
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.truncate(p, newLength);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public boolean delete(Path f, final boolean recursive) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.delete(getPathName(p), recursive);
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.delete(p, recursive);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<ContentSummary>() {
+ @Override
+ public ContentSummary doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getContentSummary(getPathName(p));
+ }
+ @Override
+ public ContentSummary next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.getContentSummary(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ /** Set a directory's quotas
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long, StorageType)
+ */
+ public void setQuota(Path src, final long namespaceQuota,
+ final long storagespaceQuota) throws IOException {
+ Path absF = fixRelativePart(src);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setQuota(getPathName(p), namespaceQuota, storagespaceQuota);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ // setQuota is not defined in FileSystem, so we only can resolve
+ // within this DFS
+ return doCall(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Set the per type storage quota of a directory.
+ *
+ * @param src target directory whose quota is to be modified.
+ * @param type storage type of the specific storage type quota to be modified.
+ * @param quota value of the specific storage type quota to be modified.
+ * Maybe {@link HdfsConstants#QUOTA_RESET} to clear quota by storage type.
+ */
+ public void setQuotaByStorageType(
+ Path src, final StorageType type, final long quota)
+ throws IOException {
+ Path absF = fixRelativePart(src);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setQuotaByStorageType(getPathName(p), type, quota);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ // setQuotaByStorageType is not defined in FileSystem, so we only can resolve
+ // within this DFS
+ return doCall(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ private FileStatus[] listStatusInternal(Path p) throws IOException {
+ String src = getPathName(p);
+
+ // fetch the first batch of entries in the directory
+ DirectoryListing thisListing = dfs.listPaths(
+ src, HdfsFileStatus.EMPTY_NAME);
+
+ if (thisListing == null) { // the directory does not exist
+ throw new FileNotFoundException("File " + p + " does not exist.");
+ }
+
+ HdfsFileStatus[] partialListing = thisListing.getPartialListing();
+ if (!thisListing.hasMore()) { // got all entries of the directory
+ FileStatus[] stats = new FileStatus[partialListing.length];
+ for (int i = 0; i < partialListing.length; i++) {
+ stats[i] = partialListing[i].makeQualified(getUri(), p);
+ }
+ statistics.incrementReadOps(1);
+ return stats;
+ }
+
+ // The directory size is too big that it needs to fetch more
+ // estimate the total number of entries in the directory
+ int totalNumEntries =
+ partialListing.length + thisListing.getRemainingEntries();
+ ArrayList<FileStatus> listing =
+ new ArrayList<FileStatus>(totalNumEntries);
+ // add the first batch of entries to the array list
+ for (HdfsFileStatus fileStatus : partialListing) {
+ listing.add(fileStatus.makeQualified(getUri(), p));
+ }
+ statistics.incrementLargeReadOps(1);
+
+ // now fetch more entries
+ do {
+ thisListing = dfs.listPaths(src, thisListing.getLastName());
+
+ if (thisListing == null) { // the directory is deleted
+ throw new FileNotFoundException("File " + p + " does not exist.");
+ }
+
+ partialListing = thisListing.getPartialListing();
+ for (HdfsFileStatus fileStatus : partialListing) {
+ listing.add(fileStatus.makeQualified(getUri(), p));
+ }
+ statistics.incrementLargeReadOps(1);
+ } while (thisListing.hasMore());
+
+ return listing.toArray(new FileStatus[listing.size()]);
+ }
+
+ /**
+ * List all the entries of a directory
+ *
+ * Note that this operation is not atomic for a large directory.
+ * The entries of a directory may be fetched from NameNode multiple times.
+ * It only guarantees that each name occurs once if a directory
+ * undergoes changes between the calls.
+ */
+ @Override
+ public FileStatus[] listStatus(Path p) throws IOException {
+ Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<FileStatus[]>() {
+ @Override
+ public FileStatus[] doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return listStatusInternal(p);
+ }
+ @Override
+ public FileStatus[] next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.listStatus(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
+ final PathFilter filter)
+ throws IOException {
+ Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
+ @Override
+ public RemoteIterator<LocatedFileStatus> doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return new DirListingIterator<LocatedFileStatus>(p, filter, true);
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
+ }
+ // symlink resolution for this methos does not work cross file systems
+ // because it is a protected method.
+ throw new IOException("Link resolution does not work with multiple " +
+ "file systems for listLocatedStatus(): " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+
+ /**
+ * Returns a remote iterator so that followup calls are made on demand
+ * while consuming the entries. This reduces memory consumption during
+ * listing of a large directory.
+ *
+ * @param p target path
+ * @return remote iterator
+ */
+ @Override
+ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+ throws IOException {
+ Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
+ @Override
+ public RemoteIterator<FileStatus> doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return new DirListingIterator<FileStatus>(p, false);
+ }
+
+ @Override
+ public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
+ throws IOException {
+ return ((DistributedFileSystem)fs).listStatusIterator(p);
+ }
+ }.resolve(this, absF);
+
+ }
+
+ /**
+ * This class defines an iterator that returns
+ * the file status of each file/subdirectory of a directory
+ *
+ * if needLocation, status contains block location if it is a file
+ * throws a RuntimeException with the error as its cause.
+ *
+ * @param <T> the type of the file status
+ */
+ private class DirListingIterator<T extends FileStatus>
+ implements RemoteIterator<T> {
+ private DirectoryListing thisListing;
+ private int i;
+ private Path p;
+ private String src;
+ private T curStat = null;
+ private PathFilter filter;
+ private boolean needLocation;
+
+ private DirListingIterator(Path p, PathFilter filter,
+ boolean needLocation) throws IOException {
+ this.p = p;
+ this.src = getPathName(p);
+ this.filter = filter;
+ this.needLocation = needLocation;
+ // fetch the first batch of entries in the directory
+ thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
+ needLocation);
+ statistics.incrementReadOps(1);
+ if (thisListing == null) { // the directory does not exist
+ throw new FileNotFoundException("File " + p + " does not exist.");
+ }
+ i = 0;
+ }
+
+ private DirListingIterator(Path p, boolean needLocation)
+ throws IOException {
+ this(p, null, needLocation);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean hasNext() throws IOException {
+ while (curStat == null && hasNextNoFilter()) {
+ T next;
+ HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
+ if (needLocation) {
+ next = (T)((HdfsLocatedFileStatus)fileStat)
+ .makeQualifiedLocated(getUri(), p);
+ } else {
+ next = (T)fileStat.makeQualified(getUri(), p);
+ }
+ // apply filter if not null
+ if (filter == null || filter.accept(next.getPath())) {
+ curStat = next;
+ }
+ }
+ return curStat != null;
+ }
+
+ /** Check if there is a next item before applying the given filter */
+ private boolean hasNextNoFilter() throws IOException {
+ if (thisListing == null) {
+ return false;
+ }
+ if (i >= thisListing.getPartialListing().length
+ && thisListing.hasMore()) {
+ // current listing is exhausted & fetch a new listing
+ thisListing = dfs.listPaths(src, thisListing.getLastName(),
+ needLocation);
+ statistics.incrementReadOps(1);
+ if (thisListing == null) {
+ return false;
+ }
+ i = 0;
+ }
+ return (i < thisListing.getPartialListing().length);
+ }
+
+ @Override
+ public T next() throws IOException {
+ if (hasNext()) {
+ T tmp = curStat;
+ curStat = null;
+ return tmp;
+ }
+ throw new java.util.NoSuchElementException("No more entry in " + p);
+ }
+ }
+
+ /**
+ * Create a directory, only when the parent directories exist.
+ *
+ * See {@link FsPermission#applyUMask(FsPermission)} for details of how
+ * the permission is applied.
+ *
+ * @param f The path to create
+ * @param permission The permission. See FsPermission#applyUMask for
+ * details about how this is used to calculate the
+ * effective permission.
+ */
+ public boolean mkdir(Path f, FsPermission permission) throws IOException {
+ return mkdirsInternal(f, permission, false);
+ }
+
+ /**
+ * Create a directory and its parent directories.
+ *
+ * See {@link FsPermission#applyUMask(FsPermission)} for details of how
+ * the permission is applied.
+ *
+ * @param f The path to create
+ * @param permission The permission. See FsPermission#applyUMask for
+ * details about how this is used to calculate the
+ * effective permission.
+ */
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return mkdirsInternal(f, permission, true);
+ }
+
+ private boolean mkdirsInternal(Path f, final FsPermission permission,
+ final boolean createParent) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.mkdirs(getPathName(p), permission, createParent);
+ }
+
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ // FileSystem doesn't have a non-recursive mkdir() method
+ // Best we can do is error out
+ if (!createParent) {
+ throw new IOException("FileSystem does not support non-recursive"
+ + "mkdir");
+ }
+ return fs.mkdirs(p, permission);
+ }
+ }.resolve(this, absF);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ return dfs.primitiveMkdir(getPathName(f), absolutePermission);
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ try {
+ dfs.closeOutputStreams(false);
+ super.close();
+ } finally {
+ dfs.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DFS[" + dfs + "]";
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public DFSClient getClient() {
+ return dfs;
+ }
+
+ @Override
+ public FsStatus getStatus(Path p) throws IOException {
+ statistics.incrementReadOps(1);
+ return dfs.getDiskStatus();
+ }
+
+ /**
+ * Returns count of blocks with no good replicas left. Normally should be
+ * zero.
+ *
+ * @throws IOException
+ */
+ public long getMissingBlocksCount() throws IOException {
+ return dfs.getMissingBlocksCount();
+ }
+
+ /**
+ * Returns count of blocks with replication factor 1 and have
+ * lost the only replica.
+ *
+ * @throws IOException
+ */
+ public long getMissingReplOneBlocksCount() throws IOException {
+ return dfs.getMissingReplOneBlocksCount();
+ }
+
+ /**
+ * Returns count of blocks with one of more replica missing.
+ *
+ * @throws IOException
+ */
+ public long getUnderReplicatedBlocksCount() throws IOException {
+ return dfs.getUnderReplicatedBlocksCount();
+ }
+
+ /**
+ * Returns count of blocks with at least one replica marked corrupt.
+ *
+ * @throws IOException
+ */
+ public long getCorruptBlocksCount() throws IOException {
+ return dfs.getCorruptBlocksCount();
+ }
+
+ @Override
+ public RemoteIterator<Path> listCorruptFileBlocks(Path path)
+ throws IOException {
+ return new CorruptFileBlockIterator(dfs, path);
+ }
+
+ /** @return datanode statistics. */
+ public DatanodeInfo[] getDataNodeStats() throws IOException {
+ return getDataNodeStats(DatanodeReportType.ALL);
+ }
+
+ /** @return datanode statistics for the given type. */
+ public DatanodeInfo[] getDataNodeStats(final DatanodeReportType type
+ ) throws IOException {
+ return dfs.datanodeReport(type);
+ }
+
+ /**
+ * Enter, leave or get safe mode.
+ *
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(
+ * HdfsConstants.SafeModeAction,boolean)
+ */
+ public boolean setSafeMode(HdfsConstants.SafeModeAction action)
+ throws IOException {
+ return setSafeMode(action, false);
+ }
+
+ /**
+ * Enter, leave or get safe mode.
+ *
+ * @param action
+ * One of SafeModeAction.ENTER, SafeModeAction.LEAVE and
+ * SafeModeAction.GET
+ * @param isChecked
+ * If true check only for Active NNs status, else check first NN's
+ * status
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(SafeModeAction, boolean)
+ */
+ public boolean setSafeMode(HdfsConstants.SafeModeAction action,
+ boolean isChecked) throws IOException {
+ return dfs.setSafeMode(action, isChecked);
+ }
+
+ /**
+ * Save namespace image.
+ *
+ * @param timeWindow NameNode can ignore this command if the latest
+ * checkpoint was done within the given time period (in
+ * seconds).
+ * @return true if a new checkpoint has been made
+ * @see ClientProtocol#saveNamespace(long, long)
+ */
+ public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
+ return dfs.saveNamespace(timeWindow, txGap);
+ }
+
+ /**
+ * Save namespace image. NameNode always does the checkpoint.
+ */
+ public void saveNamespace() throws IOException {
+ saveNamespace(0, 0);
+ }
+
+ /**
+ * Rolls the edit log on the active NameNode.
+ * Requires super-user privileges.
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
+ * @return the transaction ID of the newly created segment
+ */
+ public long rollEdits() throws AccessControlException, IOException {
+ return dfs.rollEdits();
+ }
+
+ /**
+ * enable/disable/check restoreFaileStorage
+ *
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String arg)
+ */
+ public boolean restoreFailedStorage(String arg)
+ throws AccessControlException, IOException {
+ return dfs.restoreFailedStorage(arg);
+ }
+
+
+ /**
+ * Refreshes the list of hosts and excluded hosts from the configured
+ * files.
+ */
+ public void refreshNodes() throws IOException {
+ dfs.refreshNodes();
+ }
+
+ /**
+ * Finalize previously upgraded files system state.
+ * @throws IOException
+ */
+ public void finalizeUpgrade() throws IOException {
+ dfs.finalizeUpgrade();
+ }
+
+ /**
+ * Rolling upgrade: prepare/finalize/query.
+ */
+ public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
+ throws IOException {
+ return dfs.rollingUpgrade(action);
+ }
+
+ /*
+ * Requests the namenode to dump data strcutures into specified
+ * file.
+ */
+ public void metaSave(String pathname) throws IOException {
+ dfs.metaSave(pathname);
+ }
+
+ @Override
+ public FsServerDefaults getServerDefaults() throws IOException {
+ return dfs.getServerDefaults();
+ }
+
+ /**
+ * Returns the stat information about the file.
+ * @throws FileNotFoundException if the file does not exist.
+ */
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FileStatus>() {
+ @Override
+ public FileStatus doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ HdfsFileStatus fi = dfs.getFileInfo(getPathName(p));
+ if (fi != null) {
+ return fi.makeQualified(getUri(), p);
+ } else {
+ throw new FileNotFoundException("File does not exist: " + p);
+ }
+ }
+ @Override
+ public FileStatus next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.getFileStatus(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void createSymlink(final Path target, final Path link,
+ final boolean createParent) throws AccessControlException,
+ FileAlreadyExistsException, FileNotFoundException,
+ ParentNotDirectoryException, UnsupportedFileSystemException,
+ IOException {
+ if (!FileSystem.areSymlinksEnabled()) {
+ throw new UnsupportedOperationException("Symlinks not supported");
+ }
+ statistics.incrementWriteOps(1);
+ final Path absF = fixRelativePart(link);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ dfs.createSymlink(target.toString(), getPathName(p), createParent);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ fs.createSymlink(target, p, createParent);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public boolean supportsSymlinks() {
+ return true;
+ }
+
+ @Override
+ public FileStatus getFileLinkStatus(final Path f)
+ throws AccessControlException, FileNotFoundException,
+ UnsupportedFileSystemException, IOException {
+ statistics.incrementReadOps(1);
+ final Path absF = fixRelativePart(f);
+ FileStatus status = new FileSystemLinkResolver<FileStatus>() {
+ @Override
+ public FileStatus doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
+ if (fi != null) {
+ return fi.makeQualified(getUri(), p);
+ } else {
+ throw new FileNotFoundException("File does not exist: " + p);
+ }
+ }
+ @Override
+ public FileStatus next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getFileLinkStatus(p);
+ }
+ }.resolve(this, absF);
+ // Fully-qualify the symlink
+ if (status.isSymlink()) {
+ Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
+ status.getPath(), status.getSymlink());
+ status.setSymlink(targetQual);
+ }
+ return status;
+ }
+
+ @Override
+ public Path getLinkTarget(final Path f) throws AccessControlException,
+ FileNotFoundException, UnsupportedFileSystemException, IOException {
+ statistics.incrementReadOps(1);
+ final Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Path>() {
+ @Override
+ public Path doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
+ if (fi != null) {
+ return fi.makeQualified(getUri(), p).getSymlink();
+ } else {
+ throw new FileNotFoundException("File does not exist: " + p);
+ }
+ }
+ @Override
+ public Path next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getLinkTarget(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ protected Path resolveLink(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
+ if (target == null) {
+ throw new FileNotFoundException("File does not exist: " + f.toString());
+ }
+ return new Path(target);
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FileChecksum>() {
+ @Override
+ public FileChecksum doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
+ }
+
+ @Override
+ public FileChecksum next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.getFileChecksum(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f, final long length)
+ throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FileChecksum>() {
+ @Override
+ public FileChecksum doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getFileChecksum(getPathName(p), length);
+ }
+
+ @Override
+ public FileChecksum next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ return ((DistributedFileSystem) fs).getFileChecksum(p, length);
+ } else {
+ throw new UnsupportedFileSystemException(
+ "getFileChecksum(Path, long) is not supported by "
+ + fs.getClass().getSimpleName());
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void setPermission(Path p, final FsPermission permission
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(p);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setPermission(getPathName(p), permission);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.setPermission(p, permission);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void setOwner(Path p, final String username, final String groupname
+ ) throws IOException {
+ if (username == null && groupname == null) {
+ throw new IOException("username == null && groupname == null");
+ }
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(p);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setOwner(getPathName(p), username, groupname);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.setOwner(p, username, groupname);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void setTimes(Path p, final long mtime, final long atime
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(p);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setTimes(getPathName(p), mtime, atime);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.setTimes(p, mtime, atime);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+
+ @Override
+ protected int getDefaultPort() {
+ return HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
+ throws IOException {
+ Token<DelegationTokenIdentifier> result =
+ dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
+ return result;
+ }
+
+ /**
+ * Requests the namenode to tell all datanodes to use a new, non-persistent
+ * bandwidth value for dfs.balance.bandwidthPerSec.
+ * The bandwidth parameter is the max number of bytes per second of network
+ * bandwidth to be used by a datanode during balancing.
+ *
+ * @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
+ * @throws IOException
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ dfs.setBalancerBandwidth(bandwidth);
+ }
+
+ /**
+ * Get a canonical service name for this file system. If the URI is logical,
+ * the hostname part of the URI will be returned.
+ * @return a service string that uniquely identifies this file system.
+ */
+ @Override
+ public String getCanonicalServiceName() {
+ return dfs.getCanonicalServiceName();
+ }
+
+ @Override
+ protected URI canonicalizeUri(URI uri) {
+ if (HAUtilClient.isLogicalUri(getConf(), uri)) {
+ // Don't try to DNS-resolve logical URIs, since the 'authority'
+ // portion isn't a proper hostname
+ return uri;
+ } else {
+ return NetUtils.getCanonicalUri(uri, getDefaultPort());
+ }
+ }
+
+ /**
+ * Utility function that returns if the NameNode is in safemode or not. In HA
+ * mode, this API will return only ActiveNN's safemode status.
+ *
+ * @return true if NameNode is in safemode, false otherwise.
+ * @throws IOException
+ * when there is an issue communicating with the NameNode
+ */
+ public boolean isInSafeMode() throws IOException {
+ return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
+ }
+
+ /** @see HdfsAdmin#allowSnapshot(Path) */
+ public void allowSnapshot(final Path path) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.allowSnapshot(getPathName(p));
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.allowSnapshot(p);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + path + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /** @see HdfsAdmin#disallowSnapshot(Path) */
+ public void disallowSnapshot(final Path path) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.disallowSnapshot(getPathName(p));
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.disallowSnapshot(p);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + path + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Path createSnapshot(final Path path, final String snapshotName)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<Path>() {
+ @Override
+ public Path doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return new Path(dfs.createSnapshot(getPathName(p), snapshotName));
+ }
+
+ @Override
+ public Path next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.createSnapshot(p);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + path + " -> " + p);
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void renameSnapshot(final Path path, final String snapshotOldName,
+ final String snapshotNewName) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.renameSnapshot(getPathName(p), snapshotOldName, snapshotNewName);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.renameSnapshot(p, snapshotOldName, snapshotNewName);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + path + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * @return All the snapshottable directories
+ * @throws IOException
+ */
+ public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
+ throws IOException {
+ return dfs.getSnapshottableDirListing();
+ }
+
+ @Override
+ public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
+ throws IOException {
+ Path absF = fixRelativePart(snapshotDir);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.deleteSnapshot(getPathName(p), snapshotName);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.deleteSnapshot(p, snapshotName);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + snapshotDir + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Get the difference between two snapshots, or between a snapshot and the
+ * current tree of a directory.
+ *
+ * @see DFSClient#getSnapshotDiffReport(String, String, String)
+ */
+ public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
+ final String fromSnapshot, final String toSnapshot) throws IOException {
+ Path absF = fixRelativePart(snapshotDir);
+ return new FileSystemLinkResolver<SnapshotDiffReport>() {
+ @Override
+ public SnapshotDiffReport doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot,
+ toSnapshot);
+ }
+
+ @Override
+ public SnapshotDiffReport next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.getSnapshotDiffReport(p, fromSnapshot, toSnapshot);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + snapshotDir + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Get the close status of a file
+ * @param src The path to the file
+ *
+ * @return return true if file is closed
+ * @throws FileNotFoundException if the file does not exist.
+ * @throws IOException If an I/O error occurred
+ */
+ public boolean isFileClosed(final Path src) throws IOException {
+ Path absF = fixRelativePart(src);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.isFileClosed(getPathName(p));
+ }
+
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.isFileClosed(p);
+ } else {
+ throw new UnsupportedOperationException("Cannot call isFileClosed"
+ + " on a symlink to a non-DistributedFileSystem: "
+ + src + " -> " + p);
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * @see {@link #addCacheDirective(CacheDirectiveInfo, EnumSet)}
+ */
+ public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
+ return addCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
+ }
+
+ /**
+ * Add a new CacheDirective.
+ *
+ * @param info Information about a directive to add.
+ * @param flags {@link CacheFlag}s to use for this operation.
+ * @return the ID of the directive that was created.
+ * @throws IOException if the directive could not be added
+ */
+ public long addCacheDirective(
+ CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
+ Preconditions.checkNotNull(info.getPath());
+ Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
+ makeQualified(getUri(), getWorkingDirectory());
+ return dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder(info).
+ setPath(path).
+ build(),
+ flags);
+ }
+
+ /**
+ * @see {@link #modifyCacheDirective(CacheDirectiveInfo, EnumSet)}
+ */
+ public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
+ modifyCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
+ }
+
+ /**
+ * Modify a CacheDirective.
+ *
+ * @param info Information about the directive to modify. You must set the ID
+ * to indicate which CacheDirective you want to modify.
+ * @param flags {@link CacheFlag}s to use for this operation.
+ * @throws IOException if the directive could not be modified
+ */
+ public void modifyCacheDirective(
+ CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
+ if (info.getPath() != null) {
+ info = new CacheDirectiveInfo.Builder(info).
+ setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
+ makeQualified(getUri(), getWorkingDirectory())).build();
+ }
+ dfs.modifyCacheDirective(info, flags);
+ }
+
+ /**
+ * Remove a CacheDirectiveInfo.
+ *
+ * @param id identifier of the CacheDirectiveInfo to remove
+ * @throws IOException if the directive could not be removed
+ */
+ public void removeCacheDirective(long id)
+ throws IOException {
+ dfs.removeCacheDirective(id);
+ }
+
+ /**
+ * List cache directives. Incrementally fetches results from the server.
+ *
+ * @param filter Filter parameters to use when listing the directives, null to
+ * list all directives visible to us.
+ * @return A RemoteIterator which returns CacheDirectiveInfo objects.
+ */
+ public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
+ CacheDirectiveInfo filter) throws IOException {
+ if (filter == null) {
+ filter = new CacheDirectiveInfo.Builder().build();
+ }
+ if (filter.getPath() != null) {
+ filter = new CacheDirectiveInfo.Builder(filter).
+ setPath(new Path(getPathName(fixRelativePart(filter.getPath())))).
+ build();
+ }
+ final RemoteIterator<CacheDirectiveEntry> iter =
+ dfs.listCacheDirectives(filter);
+ return new RemoteIterator<CacheDirectiveEntry>() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return iter.hasNext();
+ }
+
+ @Override
+ public CacheDirectiveEntry next() throws IOException {
+ // Although the paths we get back from the NameNode should always be
+ // absolute, we call makeQualified to add the scheme and authority of
+ // this DistributedFilesystem.
+ CacheDirectiveEntry desc = iter.next();
+ CacheDirectiveInfo info = desc.getInfo();
+ Path p = info.getPath().makeQualified(getUri(), getWorkingDirectory());
+ return new CacheDirectiveEntry(
+ new CacheDirectiveInfo.Builder(info).setPath(p).build(),
+ desc.getStats());
+ }
+ };
+ }
+
+ /**
+ * Add a cache pool.
+ *
+ * @param info
+ * The request to add a cache pool.
+ * @throws IOException
+ * If the request could not be completed.
+ */
+ public void addCachePool(CachePoolInfo info) throws IOException {
+ CachePoolInfo.validate(info);
+ dfs.addCachePool(info);
+ }
+
+ /**
+ * Modify an existing cache pool.
+ *
+ * @param info
+ * The request to modify a cache pool.
+ * @throws IOException
+ * If the request could not be completed.
+ */
+ public void modifyCachePool(CachePoolInfo info) throws IOException {
+ CachePoolInfo.validate(info);
+ dfs.modifyCachePool(info);
+ }
+
+ /**
+ * Remove a cache pool.
+ *
+ * @param poolName
+ * Name of the cache pool to remove.
+ * @throws IOException
+ * if the cache pool did not exist, or could not be removed.
+ */
+ public void removeCachePool(String poolName) throws IOException {
+ CachePoolInfo.validateName(poolName);
+ dfs.removeCachePool(poolName);
+ }
+
+ /**
+ * List all cache pools.
+ *
+ * @return A remote iterator from which you can get CachePoolEntry objects.
+ * Requests will be made as needed.
+ * @throws IOException
+ * If there was an error listing cache pools.
+ */
+ public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
+ return dfs.listCachePools();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.modifyAclEntries(getPathName(p), aclSpec);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.modifyAclEntries(p, aclSpec);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.removeAclEntries(getPathName(p), aclSpec);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.removeAclEntries(p, aclSpec);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeDefaultAcl(Path path) throws IOException {
+ final Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.removeDefaultAcl(getPathName(p));
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ fs.removeDefaultAcl(p);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeAcl(Path path) throws IOException {
+ final Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.removeAcl(getPathName(p));
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ fs.removeAcl(p);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setAcl(Path path, final List<AclEntry> aclSpec) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.setAcl(getPathName(p), aclSpec);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.setAcl(p, aclSpec);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public AclStatus getAclStatus(Path path) throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<AclStatus>() {
+ @Override
+ public AclStatus doCall(final Path p) throws IOException {
+ return dfs.getAclStatus(getPathName(p));
+ }
+ @Override
+ public AclStatus next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getAclStatus(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ /* HDFS only */
+ public void createEncryptionZone(final Path path, final String keyName)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ dfs.createEncryptionZone(getPathName(p), keyName);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+ myDfs.createEncryptionZone(p, keyName);
+ return null;
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot call createEncryptionZone"
+ + " on a symlink to a non-DistributedFileSystem: " + path
+ + " -> " + p);
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ /* HDFS only */
+ public EncryptionZone getEZForPath(final Path path)
+ throws IOException {
+ Preconditions.checkNotNull(path);
+ Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<EncryptionZone>() {
+ @Override
+ public EncryptionZone doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ return dfs.getEZForPath(getPathName(p));
+ }
+
+ @Override
+ public EncryptionZone next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+ return myDfs.getEZForPath(p);
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot call getEZForPath"
+ + " on a symlink to a non-DistributedFileSystem: " + path
+ + " -> " + p);
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ /* HDFS only */
+ public RemoteIterator<EncryptionZone> listEncryptionZones()
+ throws IOException {
+ return dfs.listEncryptionZones();
+ }
+
+ @Override
+ public void setXAttr(Path path, final String name, final byte[] value,
+ final EnumSet<XAttrSetFlag> flag) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.setXAttr(getPathName(p), name, value, flag);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.setXAttr(p, name, value, flag);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public byte[] getXAttr(Path path, final String name) throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<byte[]>() {
+ @Override
+ public byte[] doCall(final Path p) throws IOException {
+ return dfs.getXAttr(getPathName(p), name);
+ }
+ @Override
+ public byte[] next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getXAttr(p, name);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<Map<String, byte[]>>() {
+ @Override
+ public Map<String, byte[]> doCall(final Path p) throws IOException {
+ return dfs.getXAttrs(getPathName(p));
+ }
+ @Override
+ public Map<String, byte[]> next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getXAttrs(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(Path path, final List<String> names)
+ throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<Map<String, byte[]>>() {
+ @Override
+ public Map<String, byte[]> doCall(final Path p) throws IOException {
+ return dfs.getXAttrs(getPathName(p), names);
+ }
+ @Override
+ public Map<String, byte[]> next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getXAttrs(p, names);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public List<String> listXAttrs(Path path)
+ throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<List<String>>() {
+ @Override
+ public List<String> doCall(final Path p) throws IOException {
+ return dfs.listXAttrs(getPathName(p));
+ }
+ @Override
+ public List<String> next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.listXAttrs(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void removeXAttr(Path path, final String name) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.removeXAttr(getPathName(p), name);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.removeXAttr(p, name);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void access(Path path, final FsAction mode) throws IOException {
+ final Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.checkAccess(getPathName(p), mode);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.access(p, mode);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Token<?>[] addDelegationTokens(
+ final String renewer, Credentials credentials) throws IOException {
+ Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
+ if (dfs.isHDFSEncryptionEnabled()) {
+ KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
+ KeyProviderDelegationTokenExtension.
+ createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
+ Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
+ addDelegationTokens(renewer, credentials);
+ if (tokens != null && kpTokens != null) {
+ Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
+ System.arraycopy(tokens, 0, all, 0, tokens.length);
+ System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
+ tokens = all;
+ } else {
+ tokens = (tokens != null) ? tokens : kpTokens;
+ }
+ }
+ return tokens;
+ }
+
+ public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
+ return dfs.getInotifyEventStream();
+ }
+
+ public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
+ throws IOException {
+ return dfs.getInotifyEventStream(lastReadTxid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
new file mode 100644
index 0000000..77bed1a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * Provides an iterator interface for listCorruptFileBlocks.
+ * This class is used by DistributedFileSystem and Hdfs.
+ */
+@InterfaceAudience.Private
+public class CorruptFileBlockIterator implements RemoteIterator<Path> {
+ private final DFSClient dfs;
+ private final String path;
+
+ private String[] files = null;
+ private int fileIdx = 0;
+ private String cookie = null;
+ private Path nextPath = null;
+
+ private int callsMade = 0;
+
+ public CorruptFileBlockIterator(DFSClient dfs, Path path) throws IOException {
+ this.dfs = dfs;
+ this.path = path2String(path);
+ loadNext();
+ }
+
+ /**
+ * @return the number of calls made to the DFSClient.
+ * This is for debugging and testing purposes.
+ */
+ public int getCallsMade() {
+ return callsMade;
+ }
+
+ private String path2String(Path path) {
+ return path.toUri().getPath();
+ }
+
+ private Path string2Path(String string) {
+ return new Path(string);
+ }
+
+ private void loadNext() throws IOException {
+ if (files == null || fileIdx >= files.length) {
+ CorruptFileBlocks cfb = dfs.listCorruptFileBlocks(path, cookie);
+ files = cfb.getFiles();
+ cookie = cfb.getCookie();
+ fileIdx = 0;
+ callsMade++;
+ }
+
+ if (fileIdx >= files.length) {
+ // received an empty response
+ // there are no more corrupt file blocks
+ nextPath = null;
+ } else {
+ nextPath = string2Path(files[fileIdx]);
+ fileIdx++;
+ }
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ return nextPath != null;
+ }
+
+
+ @Override
+ public Path next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more corrupt file blocks");
+ }
+
+ Path result = nextPath;
+ loadNext();
+
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8c8afed..b934d2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -983,6 +983,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9087. Add some jitter to DataNode.checkDiskErrorThread (Elliott Clark
via Colin P. McCabe)
+ HDFS-8740. Move DistributedFileSystem to hadoop-hdfs-client. (Mingliang Liu
+ via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than