You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/09/27 19:56:30 UTC
[1/2] hadoop git commit: HDFS-8740. Move DistributedFileSystem to
hadoop-hdfs-client. Contributed by Mingliang Liu.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 1c7b3b98c -> 504d6fd95
http://git-wip-us.apache.org/repos/asf/hadoop/blob/504d6fd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
deleted file mode 100644
index cf09c64..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ /dev/null
@@ -1,2333 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.BlockStorageLocation;
-import org.apache.hadoop.fs.BlockStoragePolicySpi;
-import org.apache.hadoop.fs.CacheFlag;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSLinkResolver;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemLinkResolver;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.FsStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.XAttrSetFlag;
-import org.apache.hadoop.fs.Options.ChecksumOpt;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.fs.VolumeId;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.EncryptionZone;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
-import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/****************************************************************
- * Implementation of the abstract FileSystem for the DFS system.
- * This object is the way end-user code interacts with a Hadoop
- * DistributedFileSystem.
- *
- *****************************************************************/
-@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
-@InterfaceStability.Unstable
-public class DistributedFileSystem extends FileSystem {
- private Path workingDir;
- private URI uri;
- private String homeDirPrefix =
- HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;
-
- DFSClient dfs;
- private boolean verifyChecksum = true;
-
- static{
- HdfsConfiguration.init();
- }
-
- public DistributedFileSystem() {
- }
-
- /**
- * Return the protocol scheme for the FileSystem.
- * <p/>
- *
- * @return <code>hdfs</code>
- */
- @Override
- public String getScheme() {
- return HdfsConstants.HDFS_URI_SCHEME;
- }
-
- @Override
- public URI getUri() { return uri; }
-
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
- setConf(conf);
-
- String host = uri.getHost();
- if (host == null) {
- throw new IOException("Incomplete HDFS URI, no host: "+ uri);
- }
- homeDirPrefix = conf.get(
- HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
- HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
-
- this.dfs = new DFSClient(uri, conf, statistics);
- this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
- this.workingDir = getHomeDirectory();
- }
-
- @Override
- public Path getWorkingDirectory() {
- return workingDir;
- }
-
- @Override
- public long getDefaultBlockSize() {
- return dfs.getConf().getDefaultBlockSize();
- }
-
- @Override
- public short getDefaultReplication() {
- return dfs.getConf().getDefaultReplication();
- }
-
- @Override
- public void setWorkingDirectory(Path dir) {
- String result = fixRelativePart(dir).toUri().getPath();
- if (!DFSUtil.isValidName(result)) {
- throw new IllegalArgumentException("Invalid DFS directory name " +
- result);
- }
- workingDir = fixRelativePart(dir);
- }
-
- @Override
- public Path getHomeDirectory() {
- return makeQualified(new Path(homeDirPrefix + "/"
- + dfs.ugi.getShortUserName()));
- }
-
- /**
- * Checks that the passed URI belongs to this filesystem and returns
- * just the path component. Expects a URI with an absolute path.
- *
- * @param file URI with absolute path
- * @return path component of {file}
- * @throws IllegalArgumentException if URI does not belong to this DFS
- */
- private String getPathName(Path file) {
- checkPath(file);
- String result = file.toUri().getPath();
- if (!DFSUtil.isValidName(result)) {
- throw new IllegalArgumentException("Pathname " + result + " from " +
- file+" is not a valid DFS filename.");
- }
- return result;
- }
-
- @Override
- public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
- long len) throws IOException {
- if (file == null) {
- return null;
- }
- return getFileBlockLocations(file.getPath(), start, len);
- }
-
- @Override
- public BlockLocation[] getFileBlockLocations(Path p,
- final long start, final long len) throws IOException {
- statistics.incrementReadOps(1);
- final Path absF = fixRelativePart(p);
- return new FileSystemLinkResolver<BlockLocation[]>() {
- @Override
- public BlockLocation[] doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getBlockLocations(getPathName(p), start, len);
- }
- @Override
- public BlockLocation[] next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.getFileBlockLocations(p, start, len);
- }
- }.resolve(this, absF);
- }
-
- /**
- * This API has been deprecated since the NameNode now tracks datanode
- * storages separately. Storage IDs can be gotten from {@link
- * BlockLocation#getStorageIds()}, which are functionally equivalent to
- * the volume IDs returned here (although a String rather than a byte[]).
- *
- * Used to query storage location information for a list of blocks. This list
- * of blocks is normally constructed via a series of calls to
- * {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to
- * get the blocks for ranges of a file.
- *
- * The returned array of {@link BlockStorageLocation} augments
- * {@link BlockLocation} with a {@link VolumeId} per block replica. The
- * VolumeId specifies the volume on the datanode on which the replica resides.
- * The VolumeId associated with a replica may be null because volume
- * information can be unavailable if the corresponding datanode is down or
- * if the requested block is not found.
- *
- * This API is unstable, and datanode-side support is disabled by default. It
- * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to
- * true.
- *
- * @param blocks
- * List of target BlockLocations to query volume location information
- * @return volumeBlockLocations Augmented array of
- * {@link BlockStorageLocation}s containing additional volume location
- * information for each replica of each block.
- */
- @InterfaceStability.Unstable
- @Deprecated
- public BlockStorageLocation[] getFileBlockStorageLocations(
- List<BlockLocation> blocks) throws IOException,
- UnsupportedOperationException, InvalidBlockTokenException {
- return dfs.getBlockStorageLocations(blocks);
- }
-
- @Override
- public void setVerifyChecksum(boolean verifyChecksum) {
- this.verifyChecksum = verifyChecksum;
- }
-
- /**
- * Start the lease recovery of a file
- *
- * @param f a file
- * @return true if the file is already closed
- * @throws IOException if an error occurs
- */
- public boolean recoverLease(final Path f) throws IOException {
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.recoverLease(getPathName(p));
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- return myDfs.recoverLease(p);
- }
- throw new UnsupportedOperationException("Cannot recoverLease through" +
- " a symlink to a non-DistributedFileSystem: " + f + " -> " + p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FSDataInputStream open(Path f, final int bufferSize)
- throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataInputStream>() {
- @Override
- public FSDataInputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- final DFSInputStream dfsis =
- dfs.open(getPathName(p), bufferSize, verifyChecksum);
- return dfs.createWrappedInputStream(dfsis);
- }
- @Override
- public FSDataInputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.open(p, bufferSize);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FSDataOutputStream append(Path f, final int bufferSize,
- final Progressable progress) throws IOException {
- return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
- }
-
- /**
- * Append to an existing file (optional operation).
- *
- * @param f the existing file to be appended.
- * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
- * to be present.
- * @param bufferSize the size of the buffer to be used.
- * @param progress for reporting progress if it is not null.
- * @return Returns instance of {@link FSDataOutputStream}
- * @throws IOException
- */
- public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
- final int bufferSize, final Progressable progress) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- @Override
- public FSDataOutputStream doCall(final Path p)
- throws IOException {
- return dfs.append(getPathName(p), bufferSize, flag, progress,
- statistics);
- }
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.append(p, bufferSize);
- }
- }.resolve(this, absF);
- }
-
- /**
- * Append to an existing file (optional operation).
- *
- * @param f the existing file to be appended.
- * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
- * to be present.
- * @param bufferSize the size of the buffer to be used.
- * @param progress for reporting progress if it is not null.
- * @param favoredNodes Favored nodes for new blocks
- * @return Returns instance of {@link FSDataOutputStream}
- * @throws IOException
- */
- public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
- final int bufferSize, final Progressable progress,
- final InetSocketAddress[] favoredNodes) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- @Override
- public FSDataOutputStream doCall(final Path p)
- throws IOException {
- return dfs.append(getPathName(p), bufferSize, flag, progress,
- statistics, favoredNodes);
- }
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.append(p, bufferSize);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FSDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
- Progressable progress) throws IOException {
- return this.create(f, permission,
- overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
- blockSize, progress, null);
- }
-
- /**
- * Same as
- * {@link #create(Path, FsPermission, boolean, int, short, long,
- * Progressable)} with the addition of favoredNodes that is a hint to
- * where the namenode should place the file blocks.
- * The favored nodes hint is not persisted in HDFS. Hence it may be honored
- * at the creation time only. And with favored nodes, blocks will be pinned
- * on the datanodes to prevent balancing move the block. HDFS could move the
- * blocks during replication, to move the blocks from favored nodes. A value
- * of null means no favored nodes for this create
- */
- public HdfsDataOutputStream create(final Path f,
- final FsPermission permission, final boolean overwrite,
- final int bufferSize, final short replication, final long blockSize,
- final Progressable progress, final InetSocketAddress[] favoredNodes)
- throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<HdfsDataOutputStream>() {
- @Override
- public HdfsDataOutputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- final DFSOutputStream out = dfs.create(getPathName(f), permission,
- overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE),
- true, replication, blockSize, progress, bufferSize, null,
- favoredNodes);
- return dfs.createWrappedOutputStream(out, statistics);
- }
- @Override
- public HdfsDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- return myDfs.create(p, permission, overwrite, bufferSize, replication,
- blockSize, progress, favoredNodes);
- }
- throw new UnsupportedOperationException("Cannot create with" +
- " favoredNodes through a symlink to a non-DistributedFileSystem: "
- + f + " -> " + p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FSDataOutputStream create(final Path f, final FsPermission permission,
- final EnumSet<CreateFlag> cflags, final int bufferSize,
- final short replication, final long blockSize, final Progressable progress,
- final ChecksumOpt checksumOpt) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- @Override
- public FSDataOutputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
- cflags, replication, blockSize, progress, bufferSize,
- checksumOpt);
- return dfs.createWrappedOutputStream(dfsos, statistics);
- }
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.create(p, permission, cflags, bufferSize,
- replication, blockSize, progress, checksumOpt);
- }
- }.resolve(this, absF);
- }
-
- @Override
- protected HdfsDataOutputStream primitiveCreate(Path f,
- FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
- short replication, long blockSize, Progressable progress,
- ChecksumOpt checksumOpt) throws IOException {
- statistics.incrementWriteOps(1);
- final DFSOutputStream dfsos = dfs.primitiveCreate(
- getPathName(fixRelativePart(f)),
- absolutePermission, flag, true, replication, blockSize,
- progress, bufferSize, checksumOpt);
- return dfs.createWrappedOutputStream(dfsos, statistics);
- }
-
- /**
- * Same as create(), except fails if parent directory doesn't already exist.
- */
- @Override
- @SuppressWarnings("deprecation")
- public FSDataOutputStream createNonRecursive(final Path f,
- final FsPermission permission, final EnumSet<CreateFlag> flag,
- final int bufferSize, final short replication, final long blockSize,
- final Progressable progress) throws IOException {
- statistics.incrementWriteOps(1);
- if (flag.contains(CreateFlag.OVERWRITE)) {
- flag.add(CreateFlag.CREATE);
- }
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- @Override
- public FSDataOutputStream doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
- flag, false, replication, blockSize, progress, bufferSize, null);
- return dfs.createWrappedOutputStream(dfsos, statistics);
- }
-
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.createNonRecursive(p, permission, flag, bufferSize,
- replication, blockSize, progress);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public boolean setReplication(Path src,
- final short replication
- ) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(src);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.setReplication(getPathName(p), replication);
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.setReplication(p, replication);
- }
- }.resolve(this, absF);
- }
-
- /**
- * Set the source path to the specified storage policy.
- *
- * @param src The source path referring to either a directory or a file.
- * @param policyName The name of the storage policy.
- */
- @Override
- public void setStoragePolicy(final Path src, final String policyName)
- throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(src);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setStoragePolicy(getPathName(p), policyName);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.setStoragePolicy(p, policyName);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(path);
-
- return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
- @Override
- public BlockStoragePolicySpi doCall(final Path p) throws IOException {
- return getClient().getStoragePolicy(getPathName(p));
- }
-
- @Override
- public BlockStoragePolicySpi next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getStoragePolicy(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Collection<BlockStoragePolicy> getAllStoragePolicies()
- throws IOException {
- return Arrays.asList(dfs.getStoragePolicies());
- }
-
- /**
- * Deprecated. Prefer {@link FileSystem#getAllStoragePolicies()}
- * @return
- * @throws IOException
- */
- @Deprecated
- public BlockStoragePolicy[] getStoragePolicies() throws IOException {
- statistics.incrementReadOps(1);
- return dfs.getStoragePolicies();
- }
-
- /**
- * Move blocks from srcs to trg and delete srcs afterwards.
- * The file block sizes must be the same.
- *
- * @param trg existing file to append to
- * @param psrcs list of files (same block size, same replication)
- * @throws IOException
- */
- @Override
- public void concat(Path trg, Path [] psrcs) throws IOException {
- statistics.incrementWriteOps(1);
- // Make target absolute
- Path absF = fixRelativePart(trg);
- // Make all srcs absolute
- Path[] srcs = new Path[psrcs.length];
- for (int i=0; i<psrcs.length; i++) {
- srcs[i] = fixRelativePart(psrcs[i]);
- }
- // Try the concat without resolving any links
- String[] srcsStr = new String[psrcs.length];
- try {
- for (int i=0; i<psrcs.length; i++) {
- srcsStr[i] = getPathName(srcs[i]);
- }
- dfs.concat(getPathName(trg), srcsStr);
- } catch (UnresolvedLinkException e) {
- // Exception could be from trg or any src.
- // Fully resolve trg and srcs. Fail if any of them are a symlink.
- FileStatus stat = getFileLinkStatus(absF);
- if (stat.isSymlink()) {
- throw new IOException("Cannot concat with a symlink target: "
- + trg + " -> " + stat.getPath());
- }
- absF = fixRelativePart(stat.getPath());
- for (int i=0; i<psrcs.length; i++) {
- stat = getFileLinkStatus(srcs[i]);
- if (stat.isSymlink()) {
- throw new IOException("Cannot concat with a symlink src: "
- + psrcs[i] + " -> " + stat.getPath());
- }
- srcs[i] = fixRelativePart(stat.getPath());
- }
- // Try concat again. Can still race with another symlink.
- for (int i=0; i<psrcs.length; i++) {
- srcsStr[i] = getPathName(srcs[i]);
- }
- dfs.concat(getPathName(absF), srcsStr);
- }
- }
-
-
- @SuppressWarnings("deprecation")
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
- statistics.incrementWriteOps(1);
-
- final Path absSrc = fixRelativePart(src);
- final Path absDst = fixRelativePart(dst);
-
- // Try the rename without resolving first
- try {
- return dfs.rename(getPathName(absSrc), getPathName(absDst));
- } catch (UnresolvedLinkException e) {
- // Fully resolve the source
- final Path source = getFileLinkStatus(absSrc).getPath();
- // Keep trying to resolve the destination
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.rename(getPathName(source), getPathName(p));
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- // Should just throw an error in FileSystem#checkPath
- return doCall(p);
- }
- }.resolve(this, absDst);
- }
- }
-
- /**
- * This rename operation is guaranteed to be atomic.
- */
- @SuppressWarnings("deprecation")
- @Override
- public void rename(Path src, Path dst, final Options.Rename... options)
- throws IOException {
- statistics.incrementWriteOps(1);
- final Path absSrc = fixRelativePart(src);
- final Path absDst = fixRelativePart(dst);
- // Try the rename without resolving first
- try {
- dfs.rename(getPathName(absSrc), getPathName(absDst), options);
- } catch (UnresolvedLinkException e) {
- // Fully resolve the source
- final Path source = getFileLinkStatus(absSrc).getPath();
- // Keep trying to resolve the destination
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.rename(getPathName(source), getPathName(p), options);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- // Should just throw an error in FileSystem#checkPath
- return doCall(p);
- }
- }.resolve(this, absDst);
- }
- }
-
- @Override
- public boolean truncate(Path f, final long newLength) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.truncate(getPathName(p), newLength);
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.truncate(p, newLength);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public boolean delete(Path f, final boolean recursive) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.delete(getPathName(p), recursive);
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.delete(p, recursive);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public ContentSummary getContentSummary(Path f) throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<ContentSummary>() {
- @Override
- public ContentSummary doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getContentSummary(getPathName(p));
- }
- @Override
- public ContentSummary next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.getContentSummary(p);
- }
- }.resolve(this, absF);
- }
-
- /** Set a directory's quotas
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long, StorageType)
- */
- public void setQuota(Path src, final long namespaceQuota,
- final long storagespaceQuota) throws IOException {
- Path absF = fixRelativePart(src);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setQuota(getPathName(p), namespaceQuota, storagespaceQuota);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- // setQuota is not defined in FileSystem, so we only can resolve
- // within this DFS
- return doCall(p);
- }
- }.resolve(this, absF);
- }
-
- /**
- * Set the per type storage quota of a directory.
- *
- * @param src target directory whose quota is to be modified.
- * @param type storage type of the specific storage type quota to be modified.
- * @param quota value of the specific storage type quota to be modified.
- * Maybe {@link HdfsConstants#QUOTA_RESET} to clear quota by storage type.
- */
- public void setQuotaByStorageType(
- Path src, final StorageType type, final long quota)
- throws IOException {
- Path absF = fixRelativePart(src);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setQuotaByStorageType(getPathName(p), type, quota);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- // setQuotaByStorageType is not defined in FileSystem, so we only can resolve
- // within this DFS
- return doCall(p);
- }
- }.resolve(this, absF);
- }
-
- private FileStatus[] listStatusInternal(Path p) throws IOException {
- String src = getPathName(p);
-
- // fetch the first batch of entries in the directory
- DirectoryListing thisListing = dfs.listPaths(
- src, HdfsFileStatus.EMPTY_NAME);
-
- if (thisListing == null) { // the directory does not exist
- throw new FileNotFoundException("File " + p + " does not exist.");
- }
-
- HdfsFileStatus[] partialListing = thisListing.getPartialListing();
- if (!thisListing.hasMore()) { // got all entries of the directory
- FileStatus[] stats = new FileStatus[partialListing.length];
- for (int i = 0; i < partialListing.length; i++) {
- stats[i] = partialListing[i].makeQualified(getUri(), p);
- }
- statistics.incrementReadOps(1);
- return stats;
- }
-
- // The directory size is too big that it needs to fetch more
- // estimate the total number of entries in the directory
- int totalNumEntries =
- partialListing.length + thisListing.getRemainingEntries();
- ArrayList<FileStatus> listing =
- new ArrayList<FileStatus>(totalNumEntries);
- // add the first batch of entries to the array list
- for (HdfsFileStatus fileStatus : partialListing) {
- listing.add(fileStatus.makeQualified(getUri(), p));
- }
- statistics.incrementLargeReadOps(1);
-
- // now fetch more entries
- do {
- thisListing = dfs.listPaths(src, thisListing.getLastName());
-
- if (thisListing == null) { // the directory is deleted
- throw new FileNotFoundException("File " + p + " does not exist.");
- }
-
- partialListing = thisListing.getPartialListing();
- for (HdfsFileStatus fileStatus : partialListing) {
- listing.add(fileStatus.makeQualified(getUri(), p));
- }
- statistics.incrementLargeReadOps(1);
- } while (thisListing.hasMore());
-
- return listing.toArray(new FileStatus[listing.size()]);
- }
-
- /**
- * List all the entries of a directory
- *
- * Note that this operation is not atomic for a large directory.
- * The entries of a directory may be fetched from NameNode multiple times.
- * It only guarantees that each name occurs once if a directory
- * undergoes changes between the calls.
- */
- @Override
- public FileStatus[] listStatus(Path p) throws IOException {
- Path absF = fixRelativePart(p);
- return new FileSystemLinkResolver<FileStatus[]>() {
- @Override
- public FileStatus[] doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return listStatusInternal(p);
- }
- @Override
- public FileStatus[] next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.listStatus(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
- final PathFilter filter)
- throws IOException {
- Path absF = fixRelativePart(p);
- return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
- @Override
- public RemoteIterator<LocatedFileStatus> doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return new DirListingIterator<LocatedFileStatus>(p, filter, true);
- }
-
- @Override
- public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
- }
- // symlink resolution for this methos does not work cross file systems
- // because it is a protected method.
- throw new IOException("Link resolution does not work with multiple " +
- "file systems for listLocatedStatus(): " + p);
- }
- }.resolve(this, absF);
- }
-
-
- /**
- * Returns a remote iterator so that followup calls are made on demand
- * while consuming the entries. This reduces memory consumption during
- * listing of a large directory.
- *
- * @param p target path
- * @return remote iterator
- */
- @Override
- public RemoteIterator<FileStatus> listStatusIterator(final Path p)
- throws IOException {
- Path absF = fixRelativePart(p);
- return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
- @Override
- public RemoteIterator<FileStatus> doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return new DirListingIterator<FileStatus>(p, false);
- }
-
- @Override
- public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
- throws IOException {
- return ((DistributedFileSystem)fs).listStatusIterator(p);
- }
- }.resolve(this, absF);
-
- }
-
- /**
- * This class defines an iterator that returns
- * the file status of each file/subdirectory of a directory
- *
- * if needLocation, status contains block location if it is a file
- * throws a RuntimeException with the error as its cause.
- *
- * @param <T> the type of the file status
- */
- private class DirListingIterator<T extends FileStatus>
- implements RemoteIterator<T> {
- private DirectoryListing thisListing;
- private int i;
- private Path p;
- private String src;
- private T curStat = null;
- private PathFilter filter;
- private boolean needLocation;
-
- private DirListingIterator(Path p, PathFilter filter,
- boolean needLocation) throws IOException {
- this.p = p;
- this.src = getPathName(p);
- this.filter = filter;
- this.needLocation = needLocation;
- // fetch the first batch of entries in the directory
- thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
- needLocation);
- statistics.incrementReadOps(1);
- if (thisListing == null) { // the directory does not exist
- throw new FileNotFoundException("File " + p + " does not exist.");
- }
- i = 0;
- }
-
- private DirListingIterator(Path p, boolean needLocation)
- throws IOException {
- this(p, null, needLocation);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean hasNext() throws IOException {
- while (curStat == null && hasNextNoFilter()) {
- T next;
- HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
- if (needLocation) {
- next = (T)((HdfsLocatedFileStatus)fileStat)
- .makeQualifiedLocated(getUri(), p);
- } else {
- next = (T)fileStat.makeQualified(getUri(), p);
- }
- // apply filter if not null
- if (filter == null || filter.accept(next.getPath())) {
- curStat = next;
- }
- }
- return curStat != null;
- }
-
- /** Check if there is a next item before applying the given filter */
- private boolean hasNextNoFilter() throws IOException {
- if (thisListing == null) {
- return false;
- }
- if (i >= thisListing.getPartialListing().length
- && thisListing.hasMore()) {
- // current listing is exhausted & fetch a new listing
- thisListing = dfs.listPaths(src, thisListing.getLastName(),
- needLocation);
- statistics.incrementReadOps(1);
- if (thisListing == null) {
- return false;
- }
- i = 0;
- }
- return (i < thisListing.getPartialListing().length);
- }
-
- @Override
- public T next() throws IOException {
- if (hasNext()) {
- T tmp = curStat;
- curStat = null;
- return tmp;
- }
- throw new java.util.NoSuchElementException("No more entry in " + p);
- }
- }
-
- /**
- * Create a directory, only when the parent directories exist.
- *
- * See {@link FsPermission#applyUMask(FsPermission)} for details of how
- * the permission is applied.
- *
- * @param f The path to create
- * @param permission The permission. See FsPermission#applyUMask for
- * details about how this is used to calculate the
- * effective permission.
- */
- public boolean mkdir(Path f, FsPermission permission) throws IOException {
- return mkdirsInternal(f, permission, false);
- }
-
- /**
- * Create a directory and its parent directories.
- *
- * See {@link FsPermission#applyUMask(FsPermission)} for details of how
- * the permission is applied.
- *
- * @param f The path to create
- * @param permission The permission. See FsPermission#applyUMask for
- * details about how this is used to calculate the
- * effective permission.
- */
- @Override
- public boolean mkdirs(Path f, FsPermission permission) throws IOException {
- return mkdirsInternal(f, permission, true);
- }
-
- private boolean mkdirsInternal(Path f, final FsPermission permission,
- final boolean createParent) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.mkdirs(getPathName(p), permission, createParent);
- }
-
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- // FileSystem doesn't have a non-recursive mkdir() method
- // Best we can do is error out
- if (!createParent) {
- throw new IOException("FileSystem does not support non-recursive"
- + "mkdir");
- }
- return fs.mkdirs(p, permission);
- }
- }.resolve(this, absF);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
- throws IOException {
- statistics.incrementWriteOps(1);
- return dfs.primitiveMkdir(getPathName(f), absolutePermission);
- }
-
-
- @Override
- public void close() throws IOException {
- try {
- dfs.closeOutputStreams(false);
- super.close();
- } finally {
- dfs.close();
- }
- }
-
- @Override
- public String toString() {
- return "DFS[" + dfs + "]";
- }
-
- @InterfaceAudience.Private
- @VisibleForTesting
- public DFSClient getClient() {
- return dfs;
- }
-
- /** @deprecated Use {@link org.apache.hadoop.fs.FsStatus} instead */
- @InterfaceAudience.Private
- @Deprecated
- public static class DiskStatus extends FsStatus {
- public DiskStatus(FsStatus stats) {
- super(stats.getCapacity(), stats.getUsed(), stats.getRemaining());
- }
-
- public DiskStatus(long capacity, long dfsUsed, long remaining) {
- super(capacity, dfsUsed, remaining);
- }
-
- public long getDfsUsed() {
- return super.getUsed();
- }
- }
-
- @Override
- public FsStatus getStatus(Path p) throws IOException {
- statistics.incrementReadOps(1);
- return dfs.getDiskStatus();
- }
-
- /** Return the disk usage of the filesystem, including total capacity,
- * used space, and remaining space
- * @deprecated Use {@link org.apache.hadoop.fs.FileSystem#getStatus()}
- * instead */
- @Deprecated
- public DiskStatus getDiskStatus() throws IOException {
- return new DiskStatus(dfs.getDiskStatus());
- }
-
- /** Return the total raw capacity of the filesystem, disregarding
- * replication.
- * @deprecated Use {@link org.apache.hadoop.fs.FileSystem#getStatus()}
- * instead */
- @Deprecated
- public long getRawCapacity() throws IOException{
- return dfs.getDiskStatus().getCapacity();
- }
-
- /** Return the total raw used space in the filesystem, disregarding
- * replication.
- * @deprecated Use {@link org.apache.hadoop.fs.FileSystem#getStatus()}
- * instead */
- @Deprecated
- public long getRawUsed() throws IOException{
- return dfs.getDiskStatus().getUsed();
- }
-
- /**
- * Returns count of blocks with no good replicas left. Normally should be
- * zero.
- *
- * @throws IOException
- */
- public long getMissingBlocksCount() throws IOException {
- return dfs.getMissingBlocksCount();
- }
-
- /**
- * Returns count of blocks with replication factor 1 and have
- * lost the only replica.
- *
- * @throws IOException
- */
- public long getMissingReplOneBlocksCount() throws IOException {
- return dfs.getMissingReplOneBlocksCount();
- }
-
- /**
- * Returns count of blocks with one of more replica missing.
- *
- * @throws IOException
- */
- public long getUnderReplicatedBlocksCount() throws IOException {
- return dfs.getUnderReplicatedBlocksCount();
- }
-
- /**
- * Returns count of blocks with at least one replica marked corrupt.
- *
- * @throws IOException
- */
- public long getCorruptBlocksCount() throws IOException {
- return dfs.getCorruptBlocksCount();
- }
-
- @Override
- public RemoteIterator<Path> listCorruptFileBlocks(Path path)
- throws IOException {
- return new CorruptFileBlockIterator(dfs, path);
- }
-
- /** @return datanode statistics. */
- public DatanodeInfo[] getDataNodeStats() throws IOException {
- return getDataNodeStats(DatanodeReportType.ALL);
- }
-
- /** @return datanode statistics for the given type. */
- public DatanodeInfo[] getDataNodeStats(final DatanodeReportType type
- ) throws IOException {
- return dfs.datanodeReport(type);
- }
-
- /**
- * Enter, leave or get safe mode.
- *
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(
- * HdfsConstants.SafeModeAction,boolean)
- */
- public boolean setSafeMode(HdfsConstants.SafeModeAction action)
- throws IOException {
- return setSafeMode(action, false);
- }
-
- /**
- * Enter, leave or get safe mode.
- *
- * @param action
- * One of SafeModeAction.ENTER, SafeModeAction.LEAVE and
- * SafeModeAction.GET
- * @param isChecked
- * If true check only for Active NNs status, else check first NN's
- * status
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(SafeModeAction, boolean)
- */
- public boolean setSafeMode(HdfsConstants.SafeModeAction action,
- boolean isChecked) throws IOException {
- return dfs.setSafeMode(action, isChecked);
- }
-
- /**
- * Save namespace image.
- *
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#saveNamespace()
- */
- public void saveNamespace() throws AccessControlException, IOException {
- dfs.saveNamespace();
- }
-
- /**
- * Rolls the edit log on the active NameNode.
- * Requires super-user privileges.
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
- * @return the transaction ID of the newly created segment
- */
- public long rollEdits() throws AccessControlException, IOException {
- return dfs.rollEdits();
- }
-
- /**
- * enable/disable/check restoreFaileStorage
- *
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String arg)
- */
- public boolean restoreFailedStorage(String arg)
- throws AccessControlException, IOException {
- return dfs.restoreFailedStorage(arg);
- }
-
-
- /**
- * Refreshes the list of hosts and excluded hosts from the configured
- * files.
- */
- public void refreshNodes() throws IOException {
- dfs.refreshNodes();
- }
-
- /**
- * Finalize previously upgraded files system state.
- * @throws IOException
- */
- public void finalizeUpgrade() throws IOException {
- dfs.finalizeUpgrade();
- }
-
- /**
- * Rolling upgrade: prepare/finalize/query.
- */
- public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
- throws IOException {
- return dfs.rollingUpgrade(action);
- }
-
- /*
- * Requests the namenode to dump data strcutures into specified
- * file.
- */
- public void metaSave(String pathname) throws IOException {
- dfs.metaSave(pathname);
- }
-
- @Override
- public FsServerDefaults getServerDefaults() throws IOException {
- return dfs.getServerDefaults();
- }
-
- /**
- * Returns the stat information about the file.
- * @throws FileNotFoundException if the file does not exist.
- */
- @Override
- public FileStatus getFileStatus(Path f) throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FileStatus>() {
- @Override
- public FileStatus doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- HdfsFileStatus fi = dfs.getFileInfo(getPathName(p));
- if (fi != null) {
- return fi.makeQualified(getUri(), p);
- } else {
- throw new FileNotFoundException("File does not exist: " + p);
- }
- }
- @Override
- public FileStatus next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.getFileStatus(p);
- }
- }.resolve(this, absF);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
- IOException {
- if (!FileSystem.areSymlinksEnabled()) {
- throw new UnsupportedOperationException("Symlinks not supported");
- }
- statistics.incrementWriteOps(1);
- final Path absF = fixRelativePart(link);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- dfs.createSymlink(target.toString(), getPathName(p), createParent);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- fs.createSymlink(target, p, createParent);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public boolean supportsSymlinks() {
- return true;
- }
-
- @Override
- public FileStatus getFileLinkStatus(final Path f)
- throws AccessControlException, FileNotFoundException,
- UnsupportedFileSystemException, IOException {
- statistics.incrementReadOps(1);
- final Path absF = fixRelativePart(f);
- FileStatus status = new FileSystemLinkResolver<FileStatus>() {
- @Override
- public FileStatus doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
- if (fi != null) {
- return fi.makeQualified(getUri(), p);
- } else {
- throw new FileNotFoundException("File does not exist: " + p);
- }
- }
- @Override
- public FileStatus next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getFileLinkStatus(p);
- }
- }.resolve(this, absF);
- // Fully-qualify the symlink
- if (status.isSymlink()) {
- Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
- status.getPath(), status.getSymlink());
- status.setSymlink(targetQual);
- }
- return status;
- }
-
- @Override
- public Path getLinkTarget(final Path f) throws AccessControlException,
- FileNotFoundException, UnsupportedFileSystemException, IOException {
- statistics.incrementReadOps(1);
- final Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Path>() {
- @Override
- public Path doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
- if (fi != null) {
- return fi.makeQualified(getUri(), p).getSymlink();
- } else {
- throw new FileNotFoundException("File does not exist: " + p);
- }
- }
- @Override
- public Path next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getLinkTarget(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- protected Path resolveLink(Path f) throws IOException {
- statistics.incrementReadOps(1);
- String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
- if (target == null) {
- throw new FileNotFoundException("File does not exist: " + f.toString());
- }
- return new Path(target);
- }
-
- @Override
- public FileChecksum getFileChecksum(Path f) throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FileChecksum>() {
- @Override
- public FileChecksum doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
- }
-
- @Override
- public FileChecksum next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.getFileChecksum(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FileChecksum getFileChecksum(Path f, final long length)
- throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FileChecksum>() {
- @Override
- public FileChecksum doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getFileChecksum(getPathName(p), length);
- }
-
- @Override
- public FileChecksum next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- return ((DistributedFileSystem) fs).getFileChecksum(p, length);
- } else {
- throw new UnsupportedFileSystemException(
- "getFileChecksum(Path, long) is not supported by "
- + fs.getClass().getSimpleName());
- }
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void setPermission(Path p, final FsPermission permission
- ) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(p);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setPermission(getPathName(p), permission);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.setPermission(p, permission);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void setOwner(Path p, final String username, final String groupname
- ) throws IOException {
- if (username == null && groupname == null) {
- throw new IOException("username == null && groupname == null");
- }
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(p);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setOwner(getPathName(p), username, groupname);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.setOwner(p, username, groupname);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void setTimes(Path p, final long mtime, final long atime
- ) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(p);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setTimes(getPathName(p), mtime, atime);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.setTimes(p, mtime, atime);
- return null;
- }
- }.resolve(this, absF);
- }
-
-
- @Override
- protected int getDefaultPort() {
- return HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
- }
-
- @Override
- public Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
- throws IOException {
- Token<DelegationTokenIdentifier> result =
- dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
- return result;
- }
-
- /**
- * Requests the namenode to tell all datanodes to use a new, non-persistent
- * bandwidth value for dfs.balance.bandwidthPerSec.
- * The bandwidth parameter is the max number of bytes per second of network
- * bandwidth to be used by a datanode during balancing.
- *
- * @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
- * @throws IOException
- */
- public void setBalancerBandwidth(long bandwidth) throws IOException {
- dfs.setBalancerBandwidth(bandwidth);
- }
-
- /**
- * Get a canonical service name for this file system. If the URI is logical,
- * the hostname part of the URI will be returned.
- * @return a service string that uniquely identifies this file system.
- */
- @Override
- public String getCanonicalServiceName() {
- return dfs.getCanonicalServiceName();
- }
-
- @Override
- protected URI canonicalizeUri(URI uri) {
- if (HAUtilClient.isLogicalUri(getConf(), uri)) {
- // Don't try to DNS-resolve logical URIs, since the 'authority'
- // portion isn't a proper hostname
- return uri;
- } else {
- return NetUtils.getCanonicalUri(uri, getDefaultPort());
- }
- }
-
- /**
- * Utility function that returns if the NameNode is in safemode or not. In HA
- * mode, this API will return only ActiveNN's safemode status.
- *
- * @return true if NameNode is in safemode, false otherwise.
- * @throws IOException
- * when there is an issue communicating with the NameNode
- */
- public boolean isInSafeMode() throws IOException {
- return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
- }
-
- /** @see HdfsAdmin#allowSnapshot(Path) */
- public void allowSnapshot(final Path path) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.allowSnapshot(getPathName(p));
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.allowSnapshot(p);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + path + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- /** @see HdfsAdmin#disallowSnapshot(Path) */
- public void disallowSnapshot(final Path path) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.disallowSnapshot(getPathName(p));
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.disallowSnapshot(p);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + path + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Path createSnapshot(final Path path, final String snapshotName)
- throws IOException {
- Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<Path>() {
- @Override
- public Path doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return new Path(dfs.createSnapshot(getPathName(p), snapshotName));
- }
-
- @Override
- public Path next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- return myDfs.createSnapshot(p);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + path + " -> " + p);
- }
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void renameSnapshot(final Path path, final String snapshotOldName,
- final String snapshotNewName) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.renameSnapshot(getPathName(p), snapshotOldName, snapshotNewName);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.renameSnapshot(p, snapshotOldName, snapshotNewName);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + path + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * @return All the snapshottable directories
- * @throws IOException
- */
- public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
- throws IOException {
- return dfs.getSnapshottableDirListing();
- }
-
- @Override
- public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
- throws IOException {
- Path absF = fixRelativePart(snapshotDir);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.deleteSnapshot(getPathName(p), snapshotName);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.deleteSnapshot(p, snapshotName);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + snapshotDir + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * Get the difference between two snapshots, or between a snapshot and the
- * current tree of a directory.
- *
- * @see DFSClient#getSnapshotDiffReport(String, String, String)
- */
- public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
- final String fromSnapshot, final String toSnapshot) throws IOException {
- Path absF = fixRelativePart(snapshotDir);
- return new FileSystemLinkResolver<SnapshotDiffReport>() {
- @Override
- public SnapshotDiffReport doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot,
- toSnapshot);
- }
-
- @Override
- public SnapshotDiffReport next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.getSnapshotDiffReport(p, fromSnapshot, toSnapshot);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + snapshotDir + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * Get the close status of a file
- * @param src The path to the file
- *
- * @return return true if file is closed
- * @throws FileNotFoundException if the file does not exist.
- * @throws IOException If an I/O error occurred
- */
- public boolean isFileClosed(final Path src) throws IOException {
- Path absF = fixRelativePart(src);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.isFileClosed(getPathName(p));
- }
-
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- return myDfs.isFileClosed(p);
- } else {
- throw new UnsupportedOperationException("Cannot call isFileClosed"
- + " on a symlink to a non-DistributedFileSystem: "
- + src + " -> " + p);
- }
- }
- }.resolve(this, absF);
- }
-
- /**
- * @see {@link #addCacheDirective(CacheDirectiveInfo, EnumSet)}
- */
- public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
- return addCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
- }
-
- /**
- * Add a new CacheDirective.
- *
- * @param info Information about a directive to add.
- * @param flags {@link CacheFlag}s to use for this operation.
- * @return the ID of the directive that was created.
- * @throws IOException if the directive could not be added
- */
- public long addCacheDirective(
- CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
- Preconditions.checkNotNull(info.getPath());
- Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
- makeQualified(getUri(), getWorkingDirectory());
- return dfs.addCacheDirective(
- new CacheDirectiveInfo.Builder(info).
- setPath(path).
- build(),
- flags);
- }
-
- /**
- * @see {@link #modifyCacheDirective(CacheDirectiveInfo, EnumSet)}
- */
- public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
- modifyCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
- }
-
- /**
- * Modify a CacheDirective.
- *
- * @param info Information about the directive to modify. You must set the ID
- * to indicate which CacheDirective you want to modify.
- * @param flags {@link CacheFlag}s to use for this operation.
- * @throws IOException if the directive could not be modified
- */
- public void modifyCacheDirective(
- CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
- if (info.getPath() != null) {
- info = new CacheDirectiveInfo.Builder(info).
- setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
- makeQualified(getUri(), getWorkingDirectory())).build();
- }
- dfs.modifyCacheDirective(info, flags);
- }
-
- /**
- * Remove a CacheDirectiveInfo.
- *
- * @param id identifier of the CacheDirectiveInfo to remove
- * @throws IOException if the directive could not be removed
- */
- public void removeCacheDirective(long id)
- throws IOException {
- dfs.removeCacheDirective(id);
- }
-
- /**
- * List cache directives. Incrementally fetches results from the server.
- *
- * @param filter Filter parameters to use when listing the directives, null to
- * list all directives visible to us.
- * @return A RemoteIterator which returns CacheDirectiveInfo objects.
- */
- public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
- CacheDirectiveInfo filter) throws IOException {
- if (filter == null) {
- filter = new CacheDirectiveInfo.Builder().build();
- }
- if (filter.getPath() != null) {
- filter = new CacheDirectiveInfo.Builder(filter).
- setPath(new Path(getPathName(fixRelativePart(filter.getPath())))).
- build();
- }
- final RemoteIterator<CacheDirectiveEntry> iter =
- dfs.listCacheDirectives(filter);
- return new RemoteIterator<CacheDirectiveEntry>() {
- @Override
- public boolean hasNext() throws IOException {
- return iter.hasNext();
- }
-
- @Override
- public CacheDirectiveEntry next() throws IOException {
- // Although the paths we get back from the NameNode should always be
- // absolute, we call makeQualified to add the scheme and authority of
- // this DistributedFilesystem.
- CacheDirectiveEntry desc = iter.next();
- CacheDirectiveInfo info = desc.getInfo();
- Path p = info.getPath().makeQualified(getUri(), getWorkingDirectory());
- return new CacheDirectiveEntry(
- new CacheDirectiveInfo.Builder(info).setPath(p).build(),
- desc.getStats());
- }
- };
- }
-
- /**
- * Add a cache pool.
- *
- * @param info
- * The request to add a cache pool.
- * @throws IOException
- * If the request could not be completed.
- */
- public void addCachePool(CachePoolInfo info) throws IOException {
- CachePoolInfo.validate(info);
- dfs.addCachePool(info);
- }
-
- /**
- * Modify an existing cache pool.
- *
- * @param info
- * The request to modify a cache pool.
- * @throws IOException
- * If the request could not be completed.
- */
- public void modifyCachePool(CachePoolInfo info) throws IOException {
- CachePoolInfo.validate(info);
- dfs.modifyCachePool(info);
- }
-
- /**
- * Remove a cache pool.
- *
- * @param poolName
- * Name of the cache pool to remove.
- * @throws IOException
- * if the cache pool did not exist, or could not be removed.
- */
- public void removeCachePool(String poolName) throws IOException {
- CachePoolInfo.validateName(poolName);
- dfs.removeCachePool(poolName);
- }
-
- /**
- * List all cache pools.
- *
- * @return A remote iterator from which you can get CachePoolEntry objects.
- * Requests will be made as needed.
- * @throws IOException
- * If there was an error listing cache pools.
- */
- public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
- return dfs.listCachePools();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
- throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.modifyAclEntries(getPathName(p), aclSpec);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.modifyAclEntries(p, aclSpec);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
- throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.removeAclEntries(getPathName(p), aclSpec);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.removeAclEntries(p, aclSpec);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void removeDefaultAcl(Path path) throws IOException {
- final Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.removeDefaultAcl(getPathName(p));
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- fs.removeDefaultAcl(p);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void removeAcl(Path path) throws IOException {
- final Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.removeAcl(getPathName(p));
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- fs.removeAcl(p);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void setAcl(Path path, final List<AclEntry> aclSpec) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.setAcl(getPathName(p), aclSpec);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.setAcl(p, aclSpec);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public AclStatus getAclStatus(Path path) throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<AclStatus>() {
- @Override
- public AclStatus doCall(final Path p) throws IOException {
- return dfs.getAclStatus(getPathName(p));
- }
- @Override
- public AclStatus next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getAclStatus(p);
- }
- }.resolve(this, absF);
- }
-
- /* HDFS only */
- public void createEncryptionZone(final Path path, final String keyName)
- throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- dfs.createEncryptionZone(getPathName(p), keyName);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem) fs;
- myDfs.createEncryptionZone(p, keyName);
- return null;
- } else {
- throw new UnsupportedOperationException(
- "Cannot call createEncryptionZone"
- + " on a symlink to a non-DistributedFileSystem: " + path
- + " -> " + p);
- }
- }
- }.resolve(this, absF);
- }
-
- /* HDFS only */
- public EncryptionZone getEZForPath(final Path path)
- throws IOException {
- Preconditions.checkNotNull(path);
- Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<EncryptionZone>() {
- @Override
- public EncryptionZone doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- return dfs.getEZForPath(getPathName(p));
- }
-
- @Override
- public EncryptionZone next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem) fs;
- return myDfs.getEZForPath(p);
- } else {
- throw new UnsupportedOperationException(
- "Cannot call getEZForPath"
- + " on a symlink to a non-DistributedFileSystem: " + path
- + " -> " + p);
- }
- }
- }.resolve(this, absF);
- }
-
- /* HDFS only */
- public RemoteIterator<EncryptionZone> listEncryptionZones()
- throws IOException {
- return dfs.listEncryptionZones();
- }
-
- @Override
- public void setXAttr(Path path, final String name, final byte[] value,
- final EnumSet<XAttrSetFlag> flag) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
-
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.setXAttr(getPathName(p), name, value, flag);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.setXAttr(p, name, value, flag);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public byte[] getXAttr(Path path, final String name) throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<byte[]>() {
- @Override
- public byte[] doCall(final Path p) throws IOException {
- return dfs.getXAttr(getPathName(p), name);
- }
- @Override
- public byte[] next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getXAttr(p, name);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Map<String, byte[]> getXAttrs(Path path) throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<Map<String, byte[]>>() {
- @Override
- public Map<String, byte[]> doCall(final Path p) throws IOException {
- return dfs.getXAttrs(getPathName(p));
- }
- @Override
- public Map<String, byte[]> next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getXAttrs(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Map<String, byte[]> getXAttrs(Path path, final List<String> names)
- throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<Map<String, byte[]>>() {
- @Override
- public Map<String, byte[]> doCall(final Path p) throws IOException {
- return dfs.getXAttrs(getPathName(p), names);
- }
- @Override
- public Map<String, byte[]> next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getXAttrs(p, names);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public List<String> listXAttrs(Path path)
- throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<List<String>>() {
- @Override
- public List<String> doCall(final Path p) throws IOException {
- return dfs.listXAttrs(getPathName(p));
- }
- @Override
- public List<String> next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.listXAttrs(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void removeXAttr(Path path, final String name) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.removeXAttr(getPathName(p), name);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.removeXAttr(p, name);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void access(Path path, final FsAction mode) throws IOException {
- final Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.checkAccess(getPathName(p), mode);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.access(p, mode);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Token<?>[] addDelegationTokens(
- final String renewer, Credentials credentials) throws IOException {
- Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
- if (dfs.isHDFSEncryptionEnabled()) {
- KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
- KeyProviderDelegationTokenExtension.
- createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
- Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
- addDelegationTokens(renewer, credentials);
- if (tokens != null && kpTokens != null) {
- Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
- System.arraycopy(tokens, 0, all, 0, tokens.length);
- System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
- tokens = all;
- } else {
- tokens = (tokens != null) ? tokens : kpTokens;
- }
- }
- return tokens;
- }
-
- public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
- return dfs.getInotifyEventStream();
- }
-
- public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
- throws IOException {
- return dfs.getInotifyEventStream(lastReadTxid);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/504d6fd9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
deleted file mode 100644
index 77bed1a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.client.impl;
-
-import java.io.IOException;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
-/**
- * Provides an iterator interface for listCorruptFileBlocks.
- * This class is used by DistributedFileSystem and Hdfs.
- */
-@InterfaceAudience.Private
-public class CorruptFileBlockIterator implements RemoteIterator<Path> {
- private final DFSClient dfs;
- private final String path;
-
- private String[] files = null;
- private int fileIdx = 0;
- private String cookie = null;
- private Path nextPath = null;
-
- private int callsMade = 0;
-
- public CorruptFileBlockIterator(DFSClient dfs, Path path) throws IOException {
- this.dfs = dfs;
- this.path = path2String(path);
- loadNext();
- }
-
- /**
- * @return the number of calls made to the DFSClient.
- * This is for debugging and testing purposes.
- */
- public int getCallsMade() {
- return callsMade;
- }
-
- private String path2String(Path path) {
- return path.toUri().getPath();
- }
-
- private Path string2Path(String string) {
- return new Path(string);
- }
-
- private void loadNext() throws IOException {
- if (files == null || fileIdx >= files.length) {
- CorruptFileBlocks cfb = dfs.listCorruptFileBlocks(path, cookie);
- files = cfb.getFiles();
- cookie = cfb.getCookie();
- fileIdx = 0;
- callsMade++;
- }
-
- if (fileIdx >= files.length) {
- // received an empty response
- // there are no more corrupt file blocks
- nextPath = null;
- } else {
- nextPath = string2Path(files[fileIdx]);
- fileIdx++;
- }
- }
-
-
- @Override
- public boolean hasNext() {
- return nextPath != null;
- }
-
-
- @Override
- public Path next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException("No more corrupt file blocks");
- }
-
- Path result = nextPath;
- loadNext();
-
- return result;
- }
-}
\ No newline at end of file
[2/2] hadoop git commit: HDFS-8740. Move DistributedFileSystem to
hadoop-hdfs-client. Contributed by Mingliang Liu.
Posted by wh...@apache.org.
HDFS-8740. Move DistributedFileSystem to hadoop-hdfs-client. Contributed by Mingliang Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/504d6fd9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/504d6fd9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/504d6fd9
Branch: refs/heads/branch-2
Commit: 504d6fd950f6c12a661dc52d5f4031258ba4d0ae
Parents: 1c7b3b9
Author: Haohui Mai <wh...@apache.org>
Authored: Sun Sep 27 10:54:44 2015 -0700
Committer: Haohui Mai <wh...@apache.org>
Committed: Sun Sep 27 10:54:53 2015 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/DistributedFileSystem.java | 2331 +++++++++++++++++
.../client/impl/CorruptFileBlockIterator.java | 105 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/DistributedFileSystem.java | 2333 ------------------
.../client/impl/CorruptFileBlockIterator.java | 105 -
5 files changed, 2439 insertions(+), 2438 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/504d6fd9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
new file mode 100644
index 0000000..182fc01
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -0,0 +1,2331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.BlockStoragePolicySpi;
+import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSLinkResolver;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.FsStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.VolumeId;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+
+/****************************************************************
+ * Implementation of the abstract FileSystem for the DFS system.
+ * This object is the way end-user code interacts with a Hadoop
+ * DistributedFileSystem.
+ *
+ *****************************************************************/
+@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
+@InterfaceStability.Unstable
+public class DistributedFileSystem extends FileSystem {
+ private Path workingDir;
+ private URI uri;
+ private String homeDirPrefix =
+ HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;
+
+ DFSClient dfs;
+ private boolean verifyChecksum = true;
+
+ static{
+ HdfsConfigurationLoader.init();
+ }
+
+ public DistributedFileSystem() {
+ }
+
+ /**
+ * Return the protocol scheme for the FileSystem.
+ * <p/>
+ *
+ * @return <code>hdfs</code>
+ */
+ @Override
+ public String getScheme() {
+ return HdfsConstants.HDFS_URI_SCHEME;
+ }
+
+ @Override
+ public URI getUri() { return uri; }
+
+ @Override
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ super.initialize(uri, conf);
+ setConf(conf);
+
+ String host = uri.getHost();
+ if (host == null) {
+ throw new IOException("Incomplete HDFS URI, no host: "+ uri);
+ }
+ homeDirPrefix = conf.get(
+ HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
+ HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
+
+ this.dfs = new DFSClient(uri, conf, statistics);
+ this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
+ this.workingDir = getHomeDirectory();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return dfs.getConf().getDefaultBlockSize();
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return dfs.getConf().getDefaultReplication();
+ }
+
+ @Override
+ public void setWorkingDirectory(Path dir) {
+ String result = fixRelativePart(dir).toUri().getPath();
+ if (!DFSUtilClient.isValidName(result)) {
+ throw new IllegalArgumentException("Invalid DFS directory name " +
+ result);
+ }
+ workingDir = fixRelativePart(dir);
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ return makeQualified(new Path(homeDirPrefix + "/"
+ + dfs.ugi.getShortUserName()));
+ }
+
+ /**
+ * Checks that the passed URI belongs to this filesystem and returns
+ * just the path component. Expects a URI with an absolute path.
+ *
+ * @param file URI with absolute path
+ * @return path component of {file}
+ * @throws IllegalArgumentException if URI does not belong to this DFS
+ */
+ private String getPathName(Path file) {
+ checkPath(file);
+ String result = file.toUri().getPath();
+ if (!DFSUtilClient.isValidName(result)) {
+ throw new IllegalArgumentException("Pathname " + result + " from " +
+ file+" is not a valid DFS filename.");
+ }
+ return result;
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+ long len) throws IOException {
+ if (file == null) {
+ return null;
+ }
+ return getFileBlockLocations(file.getPath(), start, len);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(Path p,
+ final long start, final long len) throws IOException {
+ statistics.incrementReadOps(1);
+ final Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<BlockLocation[]>() {
+ @Override
+ public BlockLocation[] doCall(final Path p) throws IOException {
+ return dfs.getBlockLocations(getPathName(p), start, len);
+ }
+ @Override
+ public BlockLocation[] next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.getFileBlockLocations(p, start, len);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * This API has been deprecated since the NameNode now tracks datanode
+ * storages separately. Storage IDs can be gotten from {@link
+ * BlockLocation#getStorageIds()}, which are functionally equivalent to
+ * the volume IDs returned here (although a String rather than a byte[]).
+ *
+ * Used to query storage location information for a list of blocks. This list
+ * of blocks is normally constructed via a series of calls to
+ * {@link DistributedFileSystem#getFileBlockLocations(Path, long, long)} to
+ * get the blocks for ranges of a file.
+ *
+ * The returned array of {@link BlockStorageLocation} augments
+ * {@link BlockLocation} with a {@link VolumeId} per block replica. The
+ * VolumeId specifies the volume on the datanode on which the replica resides.
+ * The VolumeId associated with a replica may be null because volume
+ * information can be unavailable if the corresponding datanode is down or
+ * if the requested block is not found.
+ *
+ * This API is unstable, and datanode-side support is disabled by default. It
+ * can be enabled by setting "dfs.datanode.hdfs-blocks-metadata.enabled" to
+ * true.
+ *
+ * @param blocks
+ * List of target BlockLocations to query volume location information
+ * @return volumeBlockLocations Augmented array of
+ * {@link BlockStorageLocation}s containing additional volume location
+ * information for each replica of each block.
+ */
+ @InterfaceStability.Unstable
+ @Deprecated
+ public BlockStorageLocation[] getFileBlockStorageLocations(
+ List<BlockLocation> blocks) throws IOException,
+ UnsupportedOperationException, InvalidBlockTokenException {
+ return dfs.getBlockStorageLocations(blocks);
+ }
+
+ @Override
+ public void setVerifyChecksum(boolean verifyChecksum) {
+ this.verifyChecksum = verifyChecksum;
+ }
+
+ /**
+ * Start the lease recovery of a file
+ *
+ * @param f a file
+ * @return true if the file is already closed
+ * @throws IOException if an error occurs
+ */
+ public boolean recoverLease(final Path f) throws IOException {
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.recoverLease(getPathName(p));
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.recoverLease(p);
+ }
+ throw new UnsupportedOperationException("Cannot recoverLease through" +
+ " a symlink to a non-DistributedFileSystem: " + f + " -> " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, final int bufferSize)
+ throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataInputStream>() {
+ @Override
+ public FSDataInputStream doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ final DFSInputStream dfsis =
+ dfs.open(getPathName(p), bufferSize, verifyChecksum);
+ return dfs.createWrappedInputStream(dfsis);
+ }
+ @Override
+ public FSDataInputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.open(p, bufferSize);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, final int bufferSize,
+ final Progressable progress) throws IOException {
+ return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ *
+ * @param f the existing file to be appended.
+ * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
+ * to be present.
+ * @param bufferSize the size of the buffer to be used.
+ * @param progress for reporting progress if it is not null.
+ * @return Returns instance of {@link FSDataOutputStream}
+ * @throws IOException
+ */
+ public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataOutputStream>() {
+ @Override
+ public FSDataOutputStream doCall(final Path p)
+ throws IOException {
+ return dfs.append(getPathName(p), bufferSize, flag, progress,
+ statistics);
+ }
+ @Override
+ public FSDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.append(p, bufferSize);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ *
+ * @param f the existing file to be appended.
+ * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
+ * to be present.
+ * @param bufferSize the size of the buffer to be used.
+ * @param progress for reporting progress if it is not null.
+ * @param favoredNodes Favored nodes for new blocks
+ * @return Returns instance of {@link FSDataOutputStream}
+ * @throws IOException
+ */
+ public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final Progressable progress,
+ final InetSocketAddress[] favoredNodes) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataOutputStream>() {
+ @Override
+ public FSDataOutputStream doCall(final Path p)
+ throws IOException {
+ return dfs.append(getPathName(p), bufferSize, flag, progress,
+ statistics, favoredNodes);
+ }
+ @Override
+ public FSDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.append(p, bufferSize);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication, long blockSize,
+ Progressable progress) throws IOException {
+ return this.create(f, permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
+ blockSize, progress, null);
+ }
+
+ /**
+ * Same as
+ * {@link #create(Path, FsPermission, boolean, int, short, long,
+ * Progressable)} with the addition of favoredNodes that is a hint to
+ * where the namenode should place the file blocks.
+ * The favored nodes hint is not persisted in HDFS. Hence it may be honored
+ * at the creation time only. And with favored nodes, blocks will be pinned
+ * on the datanodes to prevent balancing move the block. HDFS could move the
+ * blocks during replication, to move the blocks from favored nodes. A value
+ * of null means no favored nodes for this create
+ */
+ public HdfsDataOutputStream create(final Path f,
+ final FsPermission permission, final boolean overwrite,
+ final int bufferSize, final short replication, final long blockSize,
+ final Progressable progress, final InetSocketAddress[] favoredNodes)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<HdfsDataOutputStream>() {
+ @Override
+ public HdfsDataOutputStream doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ final DFSOutputStream out = dfs.create(getPathName(f), permission,
+ overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+ : EnumSet.of(CreateFlag.CREATE),
+ true, replication, blockSize, progress, bufferSize, null,
+ favoredNodes);
+ return dfs.createWrappedOutputStream(out, statistics);
+ }
+ @Override
+ public HdfsDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.create(p, permission, overwrite, bufferSize, replication,
+ blockSize, progress, favoredNodes);
+ }
+ throw new UnsupportedOperationException("Cannot create with" +
+ " favoredNodes through a symlink to a non-DistributedFileSystem: "
+ + f + " -> " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FSDataOutputStream create(final Path f, final FsPermission permission,
+ final EnumSet<CreateFlag> cflags, final int bufferSize,
+ final short replication, final long blockSize, final Progressable progress,
+ final ChecksumOpt checksumOpt) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataOutputStream>() {
+ @Override
+ public FSDataOutputStream doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
+ cflags, replication, blockSize, progress, bufferSize,
+ checksumOpt);
+ return dfs.createWrappedOutputStream(dfsos, statistics);
+ }
+ @Override
+ public FSDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.create(p, permission, cflags, bufferSize,
+ replication, blockSize, progress, checksumOpt);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ protected HdfsDataOutputStream primitiveCreate(Path f,
+ FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
+ short replication, long blockSize, Progressable progress,
+ ChecksumOpt checksumOpt) throws IOException {
+ statistics.incrementWriteOps(1);
+ final DFSOutputStream dfsos = dfs.primitiveCreate(
+ getPathName(fixRelativePart(f)),
+ absolutePermission, flag, true, replication, blockSize,
+ progress, bufferSize, checksumOpt);
+ return dfs.createWrappedOutputStream(dfsos, statistics);
+ }
+
+ /**
+ * Same as create(), except fails if parent directory doesn't already exist.
+ */
+ @Override
+ @SuppressWarnings("deprecation")
+ public FSDataOutputStream createNonRecursive(final Path f,
+ final FsPermission permission, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final short replication, final long blockSize,
+ final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+ if (flag.contains(CreateFlag.OVERWRITE)) {
+ flag.add(CreateFlag.CREATE);
+ }
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FSDataOutputStream>() {
+ @Override
+ public FSDataOutputStream doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
+ flag, false, replication, blockSize, progress, bufferSize, null);
+ return dfs.createWrappedOutputStream(dfsos, statistics);
+ }
+
+ @Override
+ public FSDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.createNonRecursive(p, permission, flag, bufferSize,
+ replication, blockSize, progress);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public boolean setReplication(Path src,
+ final short replication
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(src);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.setReplication(getPathName(p), replication);
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.setReplication(p, replication);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Set the source path to the specified storage policy.
+ *
+ * @param src The source path referring to either a directory or a file.
+ * @param policyName The name of the storage policy.
+ */
+ @Override
+ public void setStoragePolicy(final Path src, final String policyName)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(src);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setStoragePolicy(getPathName(p), policyName);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.setStoragePolicy(p, policyName);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(path);
+
+ return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
+ @Override
+ public BlockStoragePolicySpi doCall(final Path p) throws IOException {
+ return getClient().getStoragePolicy(getPathName(p));
+ }
+
+ @Override
+ public BlockStoragePolicySpi next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getStoragePolicy(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Collection<BlockStoragePolicy> getAllStoragePolicies()
+ throws IOException {
+ return Arrays.asList(dfs.getStoragePolicies());
+ }
+
+ /**
+ * Deprecated. Prefer {@link FileSystem#getAllStoragePolicies()}
+ * @return
+ * @throws IOException
+ */
+ @Deprecated
+ public BlockStoragePolicy[] getStoragePolicies() throws IOException {
+ statistics.incrementReadOps(1);
+ return dfs.getStoragePolicies();
+ }
+
+ /**
+ * Move blocks from srcs to trg and delete srcs afterwards.
+ * The file block sizes must be the same.
+ *
+ * @param trg existing file to append to
+ * @param psrcs list of files (same block size, same replication)
+ * @throws IOException
+ */
+ @Override
+ public void concat(Path trg, Path [] psrcs) throws IOException {
+ statistics.incrementWriteOps(1);
+ // Make target absolute
+ Path absF = fixRelativePart(trg);
+ // Make all srcs absolute
+ Path[] srcs = new Path[psrcs.length];
+ for (int i=0; i<psrcs.length; i++) {
+ srcs[i] = fixRelativePart(psrcs[i]);
+ }
+ // Try the concat without resolving any links
+ String[] srcsStr = new String[psrcs.length];
+ try {
+ for (int i=0; i<psrcs.length; i++) {
+ srcsStr[i] = getPathName(srcs[i]);
+ }
+ dfs.concat(getPathName(trg), srcsStr);
+ } catch (UnresolvedLinkException e) {
+ // Exception could be from trg or any src.
+ // Fully resolve trg and srcs. Fail if any of them are a symlink.
+ FileStatus stat = getFileLinkStatus(absF);
+ if (stat.isSymlink()) {
+ throw new IOException("Cannot concat with a symlink target: "
+ + trg + " -> " + stat.getPath());
+ }
+ absF = fixRelativePart(stat.getPath());
+ for (int i=0; i<psrcs.length; i++) {
+ stat = getFileLinkStatus(srcs[i]);
+ if (stat.isSymlink()) {
+ throw new IOException("Cannot concat with a symlink src: "
+ + psrcs[i] + " -> " + stat.getPath());
+ }
+ srcs[i] = fixRelativePart(stat.getPath());
+ }
+ // Try concat again. Can still race with another symlink.
+ for (int i=0; i<psrcs.length; i++) {
+ srcsStr[i] = getPathName(srcs[i]);
+ }
+ dfs.concat(getPathName(absF), srcsStr);
+ }
+ }
+
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ final Path absSrc = fixRelativePart(src);
+ final Path absDst = fixRelativePart(dst);
+
+ // Try the rename without resolving first
+ try {
+ return dfs.rename(getPathName(absSrc), getPathName(absDst));
+ } catch (UnresolvedLinkException e) {
+ // Fully resolve the source
+ final Path source = getFileLinkStatus(absSrc).getPath();
+ // Keep trying to resolve the destination
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.rename(getPathName(source), getPathName(p));
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ // Should just throw an error in FileSystem#checkPath
+ return doCall(p);
+ }
+ }.resolve(this, absDst);
+ }
+ }
+
+ /**
+ * This rename operation is guaranteed to be atomic.
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ public void rename(Path src, Path dst, final Options.Rename... options)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final Path absSrc = fixRelativePart(src);
+ final Path absDst = fixRelativePart(dst);
+ // Try the rename without resolving first
+ try {
+ dfs.rename(getPathName(absSrc), getPathName(absDst), options);
+ } catch (UnresolvedLinkException e) {
+ // Fully resolve the source
+ final Path source = getFileLinkStatus(absSrc).getPath();
+ // Keep trying to resolve the destination
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.rename(getPathName(source), getPathName(p), options);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ // Should just throw an error in FileSystem#checkPath
+ return doCall(p);
+ }
+ }.resolve(this, absDst);
+ }
+ }
+
+ @Override
+ public boolean truncate(Path f, final long newLength) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.truncate(getPathName(p), newLength);
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.truncate(p, newLength);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public boolean delete(Path f, final boolean recursive) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.delete(getPathName(p), recursive);
+ }
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.delete(p, recursive);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<ContentSummary>() {
+ @Override
+ public ContentSummary doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getContentSummary(getPathName(p));
+ }
+ @Override
+ public ContentSummary next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.getContentSummary(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ /** Set a directory's quotas
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long, StorageType)
+ */
+ public void setQuota(Path src, final long namespaceQuota,
+ final long storagespaceQuota) throws IOException {
+ Path absF = fixRelativePart(src);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setQuota(getPathName(p), namespaceQuota, storagespaceQuota);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ // setQuota is not defined in FileSystem, so we only can resolve
+ // within this DFS
+ return doCall(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Set the per type storage quota of a directory.
+ *
+ * @param src target directory whose quota is to be modified.
+ * @param type storage type of the specific storage type quota to be modified.
+ * @param quota value of the specific storage type quota to be modified.
+ * Maybe {@link HdfsConstants#QUOTA_RESET} to clear quota by storage type.
+ */
+ public void setQuotaByStorageType(
+ Path src, final StorageType type, final long quota)
+ throws IOException {
+ Path absF = fixRelativePart(src);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setQuotaByStorageType(getPathName(p), type, quota);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ // setQuotaByStorageType is not defined in FileSystem, so we only can resolve
+ // within this DFS
+ return doCall(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ private FileStatus[] listStatusInternal(Path p) throws IOException {
+ String src = getPathName(p);
+
+ // fetch the first batch of entries in the directory
+ DirectoryListing thisListing = dfs.listPaths(
+ src, HdfsFileStatus.EMPTY_NAME);
+
+ if (thisListing == null) { // the directory does not exist
+ throw new FileNotFoundException("File " + p + " does not exist.");
+ }
+
+ HdfsFileStatus[] partialListing = thisListing.getPartialListing();
+ if (!thisListing.hasMore()) { // got all entries of the directory
+ FileStatus[] stats = new FileStatus[partialListing.length];
+ for (int i = 0; i < partialListing.length; i++) {
+ stats[i] = partialListing[i].makeQualified(getUri(), p);
+ }
+ statistics.incrementReadOps(1);
+ return stats;
+ }
+
+ // The directory size is too big that it needs to fetch more
+ // estimate the total number of entries in the directory
+ int totalNumEntries =
+ partialListing.length + thisListing.getRemainingEntries();
+ ArrayList<FileStatus> listing =
+ new ArrayList<FileStatus>(totalNumEntries);
+ // add the first batch of entries to the array list
+ for (HdfsFileStatus fileStatus : partialListing) {
+ listing.add(fileStatus.makeQualified(getUri(), p));
+ }
+ statistics.incrementLargeReadOps(1);
+
+ // now fetch more entries
+ do {
+ thisListing = dfs.listPaths(src, thisListing.getLastName());
+
+ if (thisListing == null) { // the directory is deleted
+ throw new FileNotFoundException("File " + p + " does not exist.");
+ }
+
+ partialListing = thisListing.getPartialListing();
+ for (HdfsFileStatus fileStatus : partialListing) {
+ listing.add(fileStatus.makeQualified(getUri(), p));
+ }
+ statistics.incrementLargeReadOps(1);
+ } while (thisListing.hasMore());
+
+ return listing.toArray(new FileStatus[listing.size()]);
+ }
+
+ /**
+ * List all the entries of a directory
+ *
+ * Note that this operation is not atomic for a large directory.
+ * The entries of a directory may be fetched from NameNode multiple times.
+ * It only guarantees that each name occurs once if a directory
+ * undergoes changes between the calls.
+ */
+ @Override
+ public FileStatus[] listStatus(Path p) throws IOException {
+ Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<FileStatus[]>() {
+ @Override
+ public FileStatus[] doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return listStatusInternal(p);
+ }
+ @Override
+ public FileStatus[] next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.listStatus(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
+ final PathFilter filter)
+ throws IOException {
+ Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
+ @Override
+ public RemoteIterator<LocatedFileStatus> doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return new DirListingIterator<LocatedFileStatus>(p, filter, true);
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
+ }
+ // symlink resolution for this methos does not work cross file systems
+ // because it is a protected method.
+ throw new IOException("Link resolution does not work with multiple " +
+ "file systems for listLocatedStatus(): " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+
+ /**
+ * Returns a remote iterator so that followup calls are made on demand
+ * while consuming the entries. This reduces memory consumption during
+ * listing of a large directory.
+ *
+ * @param p target path
+ * @return remote iterator
+ */
+ @Override
+ public RemoteIterator<FileStatus> listStatusIterator(final Path p)
+ throws IOException {
+ Path absF = fixRelativePart(p);
+ return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
+ @Override
+ public RemoteIterator<FileStatus> doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return new DirListingIterator<FileStatus>(p, false);
+ }
+
+ @Override
+ public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
+ throws IOException {
+ return ((DistributedFileSystem)fs).listStatusIterator(p);
+ }
+ }.resolve(this, absF);
+
+ }
+
+ /**
+ * This class defines an iterator that returns
+ * the file status of each file/subdirectory of a directory
+ *
+ * if needLocation, status contains block location if it is a file
+ * throws a RuntimeException with the error as its cause.
+ *
+ * @param <T> the type of the file status
+ */
+ private class DirListingIterator<T extends FileStatus>
+ implements RemoteIterator<T> {
+ private DirectoryListing thisListing;
+ private int i;
+ private Path p;
+ private String src;
+ private T curStat = null;
+ private PathFilter filter;
+ private boolean needLocation;
+
+ private DirListingIterator(Path p, PathFilter filter,
+ boolean needLocation) throws IOException {
+ this.p = p;
+ this.src = getPathName(p);
+ this.filter = filter;
+ this.needLocation = needLocation;
+ // fetch the first batch of entries in the directory
+ thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
+ needLocation);
+ statistics.incrementReadOps(1);
+ if (thisListing == null) { // the directory does not exist
+ throw new FileNotFoundException("File " + p + " does not exist.");
+ }
+ i = 0;
+ }
+
+ private DirListingIterator(Path p, boolean needLocation)
+ throws IOException {
+ this(p, null, needLocation);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean hasNext() throws IOException {
+ while (curStat == null && hasNextNoFilter()) {
+ T next;
+ HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
+ if (needLocation) {
+ next = (T)((HdfsLocatedFileStatus)fileStat)
+ .makeQualifiedLocated(getUri(), p);
+ } else {
+ next = (T)fileStat.makeQualified(getUri(), p);
+ }
+ // apply filter if not null
+ if (filter == null || filter.accept(next.getPath())) {
+ curStat = next;
+ }
+ }
+ return curStat != null;
+ }
+
+ /** Check if there is a next item before applying the given filter */
+ private boolean hasNextNoFilter() throws IOException {
+ if (thisListing == null) {
+ return false;
+ }
+ if (i >= thisListing.getPartialListing().length
+ && thisListing.hasMore()) {
+ // current listing is exhausted & fetch a new listing
+ thisListing = dfs.listPaths(src, thisListing.getLastName(),
+ needLocation);
+ statistics.incrementReadOps(1);
+ if (thisListing == null) {
+ return false;
+ }
+ i = 0;
+ }
+ return (i < thisListing.getPartialListing().length);
+ }
+
+ @Override
+ public T next() throws IOException {
+ if (hasNext()) {
+ T tmp = curStat;
+ curStat = null;
+ return tmp;
+ }
+ throw new java.util.NoSuchElementException("No more entry in " + p);
+ }
+ }
+
+ /**
+ * Create a directory, only when the parent directories exist.
+ *
+ * See {@link FsPermission#applyUMask(FsPermission)} for details of how
+ * the permission is applied.
+ *
+ * @param f The path to create
+ * @param permission The permission. See FsPermission#applyUMask for
+ * details about how this is used to calculate the
+ * effective permission.
+ */
+ public boolean mkdir(Path f, FsPermission permission) throws IOException {
+ return mkdirsInternal(f, permission, false);
+ }
+
+ /**
+ * Create a directory and its parent directories.
+ *
+ * See {@link FsPermission#applyUMask(FsPermission)} for details of how
+ * the permission is applied.
+ *
+ * @param f The path to create
+ * @param permission The permission. See FsPermission#applyUMask for
+ * details about how this is used to calculate the
+ * effective permission.
+ */
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return mkdirsInternal(f, permission, true);
+ }
+
+ private boolean mkdirsInternal(Path f, final FsPermission permission,
+ final boolean createParent) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.mkdirs(getPathName(p), permission, createParent);
+ }
+
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ // FileSystem doesn't have a non-recursive mkdir() method
+ // Best we can do is error out
+ if (!createParent) {
+ throw new IOException("FileSystem does not support non-recursive"
+ + "mkdir");
+ }
+ return fs.mkdirs(p, permission);
+ }
+ }.resolve(this, absF);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ return dfs.primitiveMkdir(getPathName(f), absolutePermission);
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ try {
+ dfs.closeOutputStreams(false);
+ super.close();
+ } finally {
+ dfs.close();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "DFS[" + dfs + "]";
+ }
+
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public DFSClient getClient() {
+ return dfs;
+ }
+
+ /** @deprecated Use {@link org.apache.hadoop.fs.FsStatus} instead */
+ @InterfaceAudience.Private
+ @Deprecated
+ public static class DiskStatus extends FsStatus {
+ public DiskStatus(FsStatus stats) {
+ super(stats.getCapacity(), stats.getUsed(), stats.getRemaining());
+ }
+
+ public DiskStatus(long capacity, long dfsUsed, long remaining) {
+ super(capacity, dfsUsed, remaining);
+ }
+
+ public long getDfsUsed() {
+ return super.getUsed();
+ }
+ }
+
+ @Override
+ public FsStatus getStatus(Path p) throws IOException {
+ statistics.incrementReadOps(1);
+ return dfs.getDiskStatus();
+ }
+
+ /** Return the disk usage of the filesystem, including total capacity,
+ * used space, and remaining space
+ * @deprecated Use {@link org.apache.hadoop.fs.FileSystem#getStatus()}
+ * instead */
+ @Deprecated
+ public DiskStatus getDiskStatus() throws IOException {
+ return new DiskStatus(dfs.getDiskStatus());
+ }
+
+ /** Return the total raw capacity of the filesystem, disregarding
+ * replication.
+ * @deprecated Use {@link org.apache.hadoop.fs.FileSystem#getStatus()}
+ * instead */
+ @Deprecated
+ public long getRawCapacity() throws IOException{
+ return dfs.getDiskStatus().getCapacity();
+ }
+
+ /** Return the total raw used space in the filesystem, disregarding
+ * replication.
+ * @deprecated Use {@link org.apache.hadoop.fs.FileSystem#getStatus()}
+ * instead */
+ @Deprecated
+ public long getRawUsed() throws IOException{
+ return dfs.getDiskStatus().getUsed();
+ }
+
+ /**
+ * Returns count of blocks with no good replicas left. Normally should be
+ * zero.
+ *
+ * @throws IOException
+ */
+ public long getMissingBlocksCount() throws IOException {
+ return dfs.getMissingBlocksCount();
+ }
+
+ /**
+ * Returns count of blocks with replication factor 1 and have
+ * lost the only replica.
+ *
+ * @throws IOException
+ */
+ public long getMissingReplOneBlocksCount() throws IOException {
+ return dfs.getMissingReplOneBlocksCount();
+ }
+
+ /**
+ * Returns count of blocks with one of more replica missing.
+ *
+ * @throws IOException
+ */
+ public long getUnderReplicatedBlocksCount() throws IOException {
+ return dfs.getUnderReplicatedBlocksCount();
+ }
+
+ /**
+ * Returns count of blocks with at least one replica marked corrupt.
+ *
+ * @throws IOException
+ */
+ public long getCorruptBlocksCount() throws IOException {
+ return dfs.getCorruptBlocksCount();
+ }
+
+ @Override
+ public RemoteIterator<Path> listCorruptFileBlocks(Path path)
+ throws IOException {
+ return new CorruptFileBlockIterator(dfs, path);
+ }
+
+ /** @return datanode statistics. */
+ public DatanodeInfo[] getDataNodeStats() throws IOException {
+ return getDataNodeStats(DatanodeReportType.ALL);
+ }
+
+ /** @return datanode statistics for the given type. */
+ public DatanodeInfo[] getDataNodeStats(final DatanodeReportType type
+ ) throws IOException {
+ return dfs.datanodeReport(type);
+ }
+
+ /**
+ * Enter, leave or get safe mode.
+ *
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(
+ * HdfsConstants.SafeModeAction,boolean)
+ */
+ public boolean setSafeMode(HdfsConstants.SafeModeAction action)
+ throws IOException {
+ return setSafeMode(action, false);
+ }
+
+ /**
+ * Enter, leave or get safe mode.
+ *
+ * @param action
+ * One of SafeModeAction.ENTER, SafeModeAction.LEAVE and
+ * SafeModeAction.GET
+ * @param isChecked
+ * If true check only for Active NNs status, else check first NN's
+ * status
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(SafeModeAction, boolean)
+ */
+ public boolean setSafeMode(HdfsConstants.SafeModeAction action,
+ boolean isChecked) throws IOException {
+ return dfs.setSafeMode(action, isChecked);
+ }
+
+ /**
+ * Save namespace image.
+ *
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#saveNamespace()
+ */
+ public void saveNamespace() throws AccessControlException, IOException {
+ dfs.saveNamespace();
+ }
+
+ /**
+ * Rolls the edit log on the active NameNode.
+ * Requires super-user privileges.
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
+ * @return the transaction ID of the newly created segment
+ */
+ public long rollEdits() throws AccessControlException, IOException {
+ return dfs.rollEdits();
+ }
+
+ /**
+ * enable/disable/check restoreFaileStorage
+ *
+ * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String arg)
+ */
+ public boolean restoreFailedStorage(String arg)
+ throws AccessControlException, IOException {
+ return dfs.restoreFailedStorage(arg);
+ }
+
+
+ /**
+ * Refreshes the list of hosts and excluded hosts from the configured
+ * files.
+ */
+ public void refreshNodes() throws IOException {
+ dfs.refreshNodes();
+ }
+
+ /**
+ * Finalize previously upgraded files system state.
+ * @throws IOException
+ */
+ public void finalizeUpgrade() throws IOException {
+ dfs.finalizeUpgrade();
+ }
+
+ /**
+ * Rolling upgrade: prepare/finalize/query.
+ */
+ public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
+ throws IOException {
+ return dfs.rollingUpgrade(action);
+ }
+
+ /*
+ * Requests the namenode to dump data strcutures into specified
+ * file.
+ */
+ public void metaSave(String pathname) throws IOException {
+ dfs.metaSave(pathname);
+ }
+
+ @Override
+ public FsServerDefaults getServerDefaults() throws IOException {
+ return dfs.getServerDefaults();
+ }
+
+ /**
+ * Returns the stat information about the file.
+ * @throws FileNotFoundException if the file does not exist.
+ */
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FileStatus>() {
+ @Override
+ public FileStatus doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ HdfsFileStatus fi = dfs.getFileInfo(getPathName(p));
+ if (fi != null) {
+ return fi.makeQualified(getUri(), p);
+ } else {
+ throw new FileNotFoundException("File does not exist: " + p);
+ }
+ }
+ @Override
+ public FileStatus next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.getFileStatus(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void createSymlink(final Path target, final Path link,
+ final boolean createParent) throws AccessControlException,
+ FileAlreadyExistsException, FileNotFoundException,
+ ParentNotDirectoryException, UnsupportedFileSystemException,
+ IOException {
+ if (!FileSystem.areSymlinksEnabled()) {
+ throw new UnsupportedOperationException("Symlinks not supported");
+ }
+ statistics.incrementWriteOps(1);
+ final Path absF = fixRelativePart(link);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ dfs.createSymlink(target.toString(), getPathName(p), createParent);
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ fs.createSymlink(target, p, createParent);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public boolean supportsSymlinks() {
+ return true;
+ }
+
+ @Override
+ public FileStatus getFileLinkStatus(final Path f)
+ throws AccessControlException, FileNotFoundException,
+ UnsupportedFileSystemException, IOException {
+ statistics.incrementReadOps(1);
+ final Path absF = fixRelativePart(f);
+ FileStatus status = new FileSystemLinkResolver<FileStatus>() {
+ @Override
+ public FileStatus doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
+ if (fi != null) {
+ return fi.makeQualified(getUri(), p);
+ } else {
+ throw new FileNotFoundException("File does not exist: " + p);
+ }
+ }
+ @Override
+ public FileStatus next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getFileLinkStatus(p);
+ }
+ }.resolve(this, absF);
+ // Fully-qualify the symlink
+ if (status.isSymlink()) {
+ Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
+ status.getPath(), status.getSymlink());
+ status.setSymlink(targetQual);
+ }
+ return status;
+ }
+
+ @Override
+ public Path getLinkTarget(final Path f) throws AccessControlException,
+ FileNotFoundException, UnsupportedFileSystemException, IOException {
+ statistics.incrementReadOps(1);
+ final Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<Path>() {
+ @Override
+ public Path doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
+ if (fi != null) {
+ return fi.makeQualified(getUri(), p).getSymlink();
+ } else {
+ throw new FileNotFoundException("File does not exist: " + p);
+ }
+ }
+ @Override
+ public Path next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getLinkTarget(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ protected Path resolveLink(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
+ if (target == null) {
+ throw new FileNotFoundException("File does not exist: " + f.toString());
+ }
+ return new Path(target);
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FileChecksum>() {
+ @Override
+ public FileChecksum doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
+ }
+
+ @Override
+ public FileChecksum next(final FileSystem fs, final Path p)
+ throws IOException {
+ return fs.getFileChecksum(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f, final long length)
+ throws IOException {
+ statistics.incrementReadOps(1);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<FileChecksum>() {
+ @Override
+ public FileChecksum doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getFileChecksum(getPathName(p), length);
+ }
+
+ @Override
+ public FileChecksum next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ return ((DistributedFileSystem) fs).getFileChecksum(p, length);
+ } else {
+ throw new UnsupportedFileSystemException(
+ "getFileChecksum(Path, long) is not supported by "
+ + fs.getClass().getSimpleName());
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void setPermission(Path p, final FsPermission permission
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(p);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setPermission(getPathName(p), permission);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.setPermission(p, permission);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void setOwner(Path p, final String username, final String groupname
+ ) throws IOException {
+ if (username == null && groupname == null) {
+ throw new IOException("username == null && groupname == null");
+ }
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(p);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setOwner(getPathName(p), username, groupname);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.setOwner(p, username, groupname);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void setTimes(Path p, final long mtime, final long atime
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ Path absF = fixRelativePart(p);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.setTimes(getPathName(p), mtime, atime);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.setTimes(p, mtime, atime);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+
+ @Override
+ protected int getDefaultPort() {
+ return HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
+ throws IOException {
+ Token<DelegationTokenIdentifier> result =
+ dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
+ return result;
+ }
+
+ /**
+ * Requests the namenode to tell all datanodes to use a new, non-persistent
+ * bandwidth value for dfs.balance.bandwidthPerSec.
+ * The bandwidth parameter is the max number of bytes per second of network
+ * bandwidth to be used by a datanode during balancing.
+ *
+ * @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
+ * @throws IOException
+ */
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ dfs.setBalancerBandwidth(bandwidth);
+ }
+
+ /**
+ * Get a canonical service name for this file system. If the URI is logical,
+ * the hostname part of the URI will be returned.
+ * @return a service string that uniquely identifies this file system.
+ */
+ @Override
+ public String getCanonicalServiceName() {
+ return dfs.getCanonicalServiceName();
+ }
+
+ @Override
+ protected URI canonicalizeUri(URI uri) {
+ if (HAUtilClient.isLogicalUri(getConf(), uri)) {
+ // Don't try to DNS-resolve logical URIs, since the 'authority'
+ // portion isn't a proper hostname
+ return uri;
+ } else {
+ return NetUtils.getCanonicalUri(uri, getDefaultPort());
+ }
+ }
+
+ /**
+ * Utility function that returns if the NameNode is in safemode or not. In HA
+ * mode, this API will return only ActiveNN's safemode status.
+ *
+ * @return true if NameNode is in safemode, false otherwise.
+ * @throws IOException
+ * when there is an issue communicating with the NameNode
+ */
+ public boolean isInSafeMode() throws IOException {
+ return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
+ }
+
+ /** @see HdfsAdmin#allowSnapshot(Path) */
+ public void allowSnapshot(final Path path) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.allowSnapshot(getPathName(p));
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.allowSnapshot(p);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + path + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /** @see HdfsAdmin#disallowSnapshot(Path) */
+ public void disallowSnapshot(final Path path) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.disallowSnapshot(getPathName(p));
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.disallowSnapshot(p);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + path + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Path createSnapshot(final Path path, final String snapshotName)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<Path>() {
+ @Override
+ public Path doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return new Path(dfs.createSnapshot(getPathName(p), snapshotName));
+ }
+
+ @Override
+ public Path next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.createSnapshot(p);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + path + " -> " + p);
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void renameSnapshot(final Path path, final String snapshotOldName,
+ final String snapshotNewName) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.renameSnapshot(getPathName(p), snapshotOldName, snapshotNewName);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.renameSnapshot(p, snapshotOldName, snapshotNewName);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + path + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * @return All the snapshottable directories
+ * @throws IOException
+ */
+ public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
+ throws IOException {
+ return dfs.getSnapshottableDirListing();
+ }
+
+ @Override
+ public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
+ throws IOException {
+ Path absF = fixRelativePart(snapshotDir);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ dfs.deleteSnapshot(getPathName(p), snapshotName);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.deleteSnapshot(p, snapshotName);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + snapshotDir + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Get the difference between two snapshots, or between a snapshot and the
+ * current tree of a directory.
+ *
+ * @see DFSClient#getSnapshotDiffReport(String, String, String)
+ */
+ public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
+ final String fromSnapshot, final String toSnapshot) throws IOException {
+ Path absF = fixRelativePart(snapshotDir);
+ return new FileSystemLinkResolver<SnapshotDiffReport>() {
+ @Override
+ public SnapshotDiffReport doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot,
+ toSnapshot);
+ }
+
+ @Override
+ public SnapshotDiffReport next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ myDfs.getSnapshotDiffReport(p, fromSnapshot, toSnapshot);
+ } else {
+ throw new UnsupportedOperationException("Cannot perform snapshot"
+ + " operations on a symlink to a non-DistributedFileSystem: "
+ + snapshotDir + " -> " + p);
+ }
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * Get the close status of a file
+ * @param src The path to the file
+ *
+ * @return return true if file is closed
+ * @throws FileNotFoundException if the file does not exist.
+ * @throws IOException If an I/O error occurred
+ */
+ public boolean isFileClosed(final Path src) throws IOException {
+ Path absF = fixRelativePart(src);
+ return new FileSystemLinkResolver<Boolean>() {
+ @Override
+ public Boolean doCall(final Path p)
+ throws IOException, UnresolvedLinkException {
+ return dfs.isFileClosed(getPathName(p));
+ }
+
+ @Override
+ public Boolean next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.isFileClosed(p);
+ } else {
+ throw new UnsupportedOperationException("Cannot call isFileClosed"
+ + " on a symlink to a non-DistributedFileSystem: "
+ + src + " -> " + p);
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * @see {@link #addCacheDirective(CacheDirectiveInfo, EnumSet)}
+ */
+ public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
+ return addCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
+ }
+
+ /**
+ * Add a new CacheDirective.
+ *
+ * @param info Information about a directive to add.
+ * @param flags {@link CacheFlag}s to use for this operation.
+ * @return the ID of the directive that was created.
+ * @throws IOException if the directive could not be added
+ */
+ public long addCacheDirective(
+ CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
+ Preconditions.checkNotNull(info.getPath());
+ Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
+ makeQualified(getUri(), getWorkingDirectory());
+ return dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder(info).
+ setPath(path).
+ build(),
+ flags);
+ }
+
+ /**
+ * @see {@link #modifyCacheDirective(CacheDirectiveInfo, EnumSet)}
+ */
+ public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
+ modifyCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
+ }
+
+ /**
+ * Modify a CacheDirective.
+ *
+ * @param info Information about the directive to modify. You must set the ID
+ * to indicate which CacheDirective you want to modify.
+ * @param flags {@link CacheFlag}s to use for this operation.
+ * @throws IOException if the directive could not be modified
+ */
+ public void modifyCacheDirective(
+ CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
+ if (info.getPath() != null) {
+ info = new CacheDirectiveInfo.Builder(info).
+ setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
+ makeQualified(getUri(), getWorkingDirectory())).build();
+ }
+ dfs.modifyCacheDirective(info, flags);
+ }
+
+ /**
+ * Remove a CacheDirectiveInfo.
+ *
+ * @param id identifier of the CacheDirectiveInfo to remove
+ * @throws IOException if the directive could not be removed
+ */
+ public void removeCacheDirective(long id)
+ throws IOException {
+ dfs.removeCacheDirective(id);
+ }
+
+ /**
+ * List cache directives. Incrementally fetches results from the server.
+ *
+ * @param filter Filter parameters to use when listing the directives, null to
+ * list all directives visible to us.
+ * @return A RemoteIterator which returns CacheDirectiveInfo objects.
+ */
+ public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
+ CacheDirectiveInfo filter) throws IOException {
+ if (filter == null) {
+ filter = new CacheDirectiveInfo.Builder().build();
+ }
+ if (filter.getPath() != null) {
+ filter = new CacheDirectiveInfo.Builder(filter).
+ setPath(new Path(getPathName(fixRelativePart(filter.getPath())))).
+ build();
+ }
+ final RemoteIterator<CacheDirectiveEntry> iter =
+ dfs.listCacheDirectives(filter);
+ return new RemoteIterator<CacheDirectiveEntry>() {
+ @Override
+ public boolean hasNext() throws IOException {
+ return iter.hasNext();
+ }
+
+ @Override
+ public CacheDirectiveEntry next() throws IOException {
+ // Although the paths we get back from the NameNode should always be
+ // absolute, we call makeQualified to add the scheme and authority of
+ // this DistributedFilesystem.
+ CacheDirectiveEntry desc = iter.next();
+ CacheDirectiveInfo info = desc.getInfo();
+ Path p = info.getPath().makeQualified(getUri(), getWorkingDirectory());
+ return new CacheDirectiveEntry(
+ new CacheDirectiveInfo.Builder(info).setPath(p).build(),
+ desc.getStats());
+ }
+ };
+ }
+
+ /**
+ * Add a cache pool.
+ *
+ * @param info
+ * The request to add a cache pool.
+ * @throws IOException
+ * If the request could not be completed.
+ */
+ public void addCachePool(CachePoolInfo info) throws IOException {
+ CachePoolInfo.validate(info);
+ dfs.addCachePool(info);
+ }
+
+ /**
+ * Modify an existing cache pool.
+ *
+ * @param info
+ * The request to modify a cache pool.
+ * @throws IOException
+ * If the request could not be completed.
+ */
+ public void modifyCachePool(CachePoolInfo info) throws IOException {
+ CachePoolInfo.validate(info);
+ dfs.modifyCachePool(info);
+ }
+
+ /**
+ * Remove a cache pool.
+ *
+ * @param poolName
+ * Name of the cache pool to remove.
+ * @throws IOException
+ * if the cache pool did not exist, or could not be removed.
+ */
+ public void removeCachePool(String poolName) throws IOException {
+ CachePoolInfo.validateName(poolName);
+ dfs.removeCachePool(poolName);
+ }
+
+ /**
+ * List all cache pools.
+ *
+ * @return A remote iterator from which you can get CachePoolEntry objects.
+ * Requests will be made as needed.
+ * @throws IOException
+ * If there was an error listing cache pools.
+ */
+ public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
+ return dfs.listCachePools();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.modifyAclEntries(getPathName(p), aclSpec);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.modifyAclEntries(p, aclSpec);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.removeAclEntries(getPathName(p), aclSpec);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.removeAclEntries(p, aclSpec);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeDefaultAcl(Path path) throws IOException {
+ final Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.removeDefaultAcl(getPathName(p));
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ fs.removeDefaultAcl(p);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void removeAcl(Path path) throws IOException {
+ final Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.removeAcl(getPathName(p));
+ return null;
+ }
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ fs.removeAcl(p);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setAcl(Path path, final List<AclEntry> aclSpec) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.setAcl(getPathName(p), aclSpec);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.setAcl(p, aclSpec);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public AclStatus getAclStatus(Path path) throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<AclStatus>() {
+ @Override
+ public AclStatus doCall(final Path p) throws IOException {
+ return dfs.getAclStatus(getPathName(p));
+ }
+ @Override
+ public AclStatus next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getAclStatus(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ /* HDFS only */
+ public void createEncryptionZone(final Path path, final String keyName)
+ throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ dfs.createEncryptionZone(getPathName(p), keyName);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+ myDfs.createEncryptionZone(p, keyName);
+ return null;
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot call createEncryptionZone"
+ + " on a symlink to a non-DistributedFileSystem: " + path
+ + " -> " + p);
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ /* HDFS only */
+ public EncryptionZone getEZForPath(final Path path)
+ throws IOException {
+ Preconditions.checkNotNull(path);
+ Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<EncryptionZone>() {
+ @Override
+ public EncryptionZone doCall(final Path p) throws IOException,
+ UnresolvedLinkException {
+ return dfs.getEZForPath(getPathName(p));
+ }
+
+ @Override
+ public EncryptionZone next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem) fs;
+ return myDfs.getEZForPath(p);
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot call getEZForPath"
+ + " on a symlink to a non-DistributedFileSystem: " + path
+ + " -> " + p);
+ }
+ }
+ }.resolve(this, absF);
+ }
+
+ /* HDFS only */
+ public RemoteIterator<EncryptionZone> listEncryptionZones()
+ throws IOException {
+ return dfs.listEncryptionZones();
+ }
+
+ @Override
+ public void setXAttr(Path path, final String name, final byte[] value,
+ final EnumSet<XAttrSetFlag> flag) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.setXAttr(getPathName(p), name, value, flag);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.setXAttr(p, name, value, flag);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public byte[] getXAttr(Path path, final String name) throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<byte[]>() {
+ @Override
+ public byte[] doCall(final Path p) throws IOException {
+ return dfs.getXAttr(getPathName(p), name);
+ }
+ @Override
+ public byte[] next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getXAttr(p, name);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<Map<String, byte[]>>() {
+ @Override
+ public Map<String, byte[]> doCall(final Path p) throws IOException {
+ return dfs.getXAttrs(getPathName(p));
+ }
+ @Override
+ public Map<String, byte[]> next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getXAttrs(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(Path path, final List<String> names)
+ throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<Map<String, byte[]>>() {
+ @Override
+ public Map<String, byte[]> doCall(final Path p) throws IOException {
+ return dfs.getXAttrs(getPathName(p), names);
+ }
+ @Override
+ public Map<String, byte[]> next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.getXAttrs(p, names);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public List<String> listXAttrs(Path path)
+ throws IOException {
+ final Path absF = fixRelativePart(path);
+ return new FileSystemLinkResolver<List<String>>() {
+ @Override
+ public List<String> doCall(final Path p) throws IOException {
+ return dfs.listXAttrs(getPathName(p));
+ }
+ @Override
+ public List<String> next(final FileSystem fs, final Path p)
+ throws IOException, UnresolvedLinkException {
+ return fs.listXAttrs(p);
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void removeXAttr(Path path, final String name) throws IOException {
+ Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.removeXAttr(getPathName(p), name);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p) throws IOException {
+ fs.removeXAttr(p, name);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public void access(Path path, final FsAction mode) throws IOException {
+ final Path absF = fixRelativePart(path);
+ new FileSystemLinkResolver<Void>() {
+ @Override
+ public Void doCall(final Path p) throws IOException {
+ dfs.checkAccess(getPathName(p), mode);
+ return null;
+ }
+
+ @Override
+ public Void next(final FileSystem fs, final Path p)
+ throws IOException {
+ fs.access(p, mode);
+ return null;
+ }
+ }.resolve(this, absF);
+ }
+
+ @Override
+ public Token<?>[] addDelegationTokens(
+ final String renewer, Credentials credentials) throws IOException {
+ Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
+ if (dfs.isHDFSEncryptionEnabled()) {
+ KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
+ KeyProviderDelegationTokenExtension.
+ createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
+ Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
+ addDelegationTokens(renewer, credentials);
+ if (tokens != null && kpTokens != null) {
+ Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
+ System.arraycopy(tokens, 0, all, 0, tokens.length);
+ System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
+ tokens = all;
+ } else {
+ tokens = (tokens != null) ? tokens : kpTokens;
+ }
+ }
+ return tokens;
+ }
+
+ public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
+ return dfs.getInotifyEventStream();
+ }
+
+ public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
+ throws IOException {
+ return dfs.getInotifyEventStream(lastReadTxid);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/504d6fd9/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
new file mode 100644
index 0000000..77bed1a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.client.impl;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * Provides an iterator interface for listCorruptFileBlocks.
+ * This class is used by DistributedFileSystem and Hdfs.
+ */
+@InterfaceAudience.Private
+public class CorruptFileBlockIterator implements RemoteIterator<Path> {
+ private final DFSClient dfs;
+ private final String path;
+
+ private String[] files = null;
+ private int fileIdx = 0;
+ private String cookie = null;
+ private Path nextPath = null;
+
+ private int callsMade = 0;
+
+ public CorruptFileBlockIterator(DFSClient dfs, Path path) throws IOException {
+ this.dfs = dfs;
+ this.path = path2String(path);
+ loadNext();
+ }
+
+ /**
+ * @return the number of calls made to the DFSClient.
+ * This is for debugging and testing purposes.
+ */
+ public int getCallsMade() {
+ return callsMade;
+ }
+
+ private String path2String(Path path) {
+ return path.toUri().getPath();
+ }
+
+ private Path string2Path(String string) {
+ return new Path(string);
+ }
+
+ private void loadNext() throws IOException {
+ if (files == null || fileIdx >= files.length) {
+ CorruptFileBlocks cfb = dfs.listCorruptFileBlocks(path, cookie);
+ files = cfb.getFiles();
+ cookie = cfb.getCookie();
+ fileIdx = 0;
+ callsMade++;
+ }
+
+ if (fileIdx >= files.length) {
+ // received an empty response
+ // there are no more corrupt file blocks
+ nextPath = null;
+ } else {
+ nextPath = string2Path(files[fileIdx]);
+ fileIdx++;
+ }
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ return nextPath != null;
+ }
+
+
+ @Override
+ public Path next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more corrupt file blocks");
+ }
+
+ Path result = nextPath;
+ loadNext();
+
+ return result;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/504d6fd9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2fe25bb..9f44e77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -635,6 +635,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9087. Add some jitter to DataNode.checkDiskErrorThread (Elliott Clark
via Colin P. McCabe)
+ HDFS-8740. Move DistributedFileSystem to hadoop-hdfs-client. (Mingliang Liu
+ via wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than