You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2015/09/29 22:30:29 UTC
[27/50] [abbrv] hadoop git commit: HDFS-8740. Move
DistributedFileSystem to hadoop-hdfs-client. Contributed by Mingliang Liu.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
deleted file mode 100644
index 1d20f82..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ /dev/null
@@ -1,2262 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.BlockStoragePolicySpi;
-import org.apache.hadoop.fs.CacheFlag;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSLinkResolver;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystemLinkResolver;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.FsStatus;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.XAttrSetFlag;
-import org.apache.hadoop.fs.Options.ChecksumOpt;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
-import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
-import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.DirectoryListing;
-import org.apache.hadoop.hdfs.protocol.EncryptionZone;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
-import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
-import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
-import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
-
-/****************************************************************
- * Implementation of the abstract FileSystem for the DFS system.
- * This object is the way end-user code interacts with a Hadoop
- * DistributedFileSystem.
- *
- *****************************************************************/
-@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
-@InterfaceStability.Unstable
-public class DistributedFileSystem extends FileSystem {
- private Path workingDir;
- private URI uri;
- private String homeDirPrefix =
- HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT;
-
- DFSClient dfs;
- private boolean verifyChecksum = true;
-
- static{
- HdfsConfiguration.init();
- }
-
- public DistributedFileSystem() {
- }
-
- /**
- * Return the protocol scheme for the FileSystem.
- * <p/>
- *
- * @return <code>hdfs</code>
- */
- @Override
- public String getScheme() {
- return HdfsConstants.HDFS_URI_SCHEME;
- }
-
- @Override
- public URI getUri() { return uri; }
-
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
- setConf(conf);
-
- String host = uri.getHost();
- if (host == null) {
- throw new IOException("Incomplete HDFS URI, no host: "+ uri);
- }
- homeDirPrefix = conf.get(
- HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
- HdfsClientConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
-
- this.dfs = new DFSClient(uri, conf, statistics);
- this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
- this.workingDir = getHomeDirectory();
- }
-
- @Override
- public Path getWorkingDirectory() {
- return workingDir;
- }
-
- @Override
- public long getDefaultBlockSize() {
- return dfs.getConf().getDefaultBlockSize();
- }
-
- @Override
- public short getDefaultReplication() {
- return dfs.getConf().getDefaultReplication();
- }
-
- @Override
- public void setWorkingDirectory(Path dir) {
- String result = fixRelativePart(dir).toUri().getPath();
- if (!DFSUtil.isValidName(result)) {
- throw new IllegalArgumentException("Invalid DFS directory name " +
- result);
- }
- workingDir = fixRelativePart(dir);
- }
-
- @Override
- public Path getHomeDirectory() {
- return makeQualified(new Path(homeDirPrefix + "/"
- + dfs.ugi.getShortUserName()));
- }
-
- /**
- * Checks that the passed URI belongs to this filesystem and returns
- * just the path component. Expects a URI with an absolute path.
- *
- * @param file URI with absolute path
- * @return path component of {file}
- * @throws IllegalArgumentException if URI does not belong to this DFS
- */
- private String getPathName(Path file) {
- checkPath(file);
- String result = file.toUri().getPath();
- if (!DFSUtil.isValidName(result)) {
- throw new IllegalArgumentException("Pathname " + result + " from " +
- file+" is not a valid DFS filename.");
- }
- return result;
- }
-
- @Override
- public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
- long len) throws IOException {
- if (file == null) {
- return null;
- }
- return getFileBlockLocations(file.getPath(), start, len);
- }
-
- @Override
- public BlockLocation[] getFileBlockLocations(Path p,
- final long start, final long len) throws IOException {
- statistics.incrementReadOps(1);
- final Path absF = fixRelativePart(p);
- return new FileSystemLinkResolver<BlockLocation[]>() {
- @Override
- public BlockLocation[] doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getBlockLocations(getPathName(p), start, len);
- }
- @Override
- public BlockLocation[] next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.getFileBlockLocations(p, start, len);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void setVerifyChecksum(boolean verifyChecksum) {
- this.verifyChecksum = verifyChecksum;
- }
-
- /**
- * Start the lease recovery of a file
- *
- * @param f a file
- * @return true if the file is already closed
- * @throws IOException if an error occurs
- */
- public boolean recoverLease(final Path f) throws IOException {
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.recoverLease(getPathName(p));
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- return myDfs.recoverLease(p);
- }
- throw new UnsupportedOperationException("Cannot recoverLease through" +
- " a symlink to a non-DistributedFileSystem: " + f + " -> " + p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FSDataInputStream open(Path f, final int bufferSize)
- throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataInputStream>() {
- @Override
- public FSDataInputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- final DFSInputStream dfsis =
- dfs.open(getPathName(p), bufferSize, verifyChecksum);
- return dfs.createWrappedInputStream(dfsis);
- }
- @Override
- public FSDataInputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.open(p, bufferSize);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FSDataOutputStream append(Path f, final int bufferSize,
- final Progressable progress) throws IOException {
- return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
- }
-
- /**
- * Append to an existing file (optional operation).
- *
- * @param f the existing file to be appended.
- * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
- * to be present.
- * @param bufferSize the size of the buffer to be used.
- * @param progress for reporting progress if it is not null.
- * @return Returns instance of {@link FSDataOutputStream}
- * @throws IOException
- */
- public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
- final int bufferSize, final Progressable progress) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- @Override
- public FSDataOutputStream doCall(final Path p)
- throws IOException {
- return dfs.append(getPathName(p), bufferSize, flag, progress,
- statistics);
- }
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.append(p, bufferSize);
- }
- }.resolve(this, absF);
- }
-
- /**
- * Append to an existing file (optional operation).
- *
- * @param f the existing file to be appended.
- * @param flag Flags for the Append operation. CreateFlag.APPEND is mandatory
- * to be present.
- * @param bufferSize the size of the buffer to be used.
- * @param progress for reporting progress if it is not null.
- * @param favoredNodes Favored nodes for new blocks
- * @return Returns instance of {@link FSDataOutputStream}
- * @throws IOException
- */
- public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
- final int bufferSize, final Progressable progress,
- final InetSocketAddress[] favoredNodes) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- @Override
- public FSDataOutputStream doCall(final Path p)
- throws IOException {
- return dfs.append(getPathName(p), bufferSize, flag, progress,
- statistics, favoredNodes);
- }
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.append(p, bufferSize);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FSDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
- Progressable progress) throws IOException {
- return this.create(f, permission,
- overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
- blockSize, progress, null);
- }
-
- /**
- * Same as
- * {@link #create(Path, FsPermission, boolean, int, short, long,
- * Progressable)} with the addition of favoredNodes that is a hint to
- * where the namenode should place the file blocks.
- * The favored nodes hint is not persisted in HDFS. Hence it may be honored
- * at the creation time only. And with favored nodes, blocks will be pinned
- * on the datanodes to prevent balancing move the block. HDFS could move the
- * blocks during replication, to move the blocks from favored nodes. A value
- * of null means no favored nodes for this create
- */
- public HdfsDataOutputStream create(final Path f,
- final FsPermission permission, final boolean overwrite,
- final int bufferSize, final short replication, final long blockSize,
- final Progressable progress, final InetSocketAddress[] favoredNodes)
- throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<HdfsDataOutputStream>() {
- @Override
- public HdfsDataOutputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- final DFSOutputStream out = dfs.create(getPathName(f), permission,
- overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- : EnumSet.of(CreateFlag.CREATE),
- true, replication, blockSize, progress, bufferSize, null,
- favoredNodes);
- return dfs.createWrappedOutputStream(out, statistics);
- }
- @Override
- public HdfsDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- return myDfs.create(p, permission, overwrite, bufferSize, replication,
- blockSize, progress, favoredNodes);
- }
- throw new UnsupportedOperationException("Cannot create with" +
- " favoredNodes through a symlink to a non-DistributedFileSystem: "
- + f + " -> " + p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FSDataOutputStream create(final Path f, final FsPermission permission,
- final EnumSet<CreateFlag> cflags, final int bufferSize,
- final short replication, final long blockSize, final Progressable progress,
- final ChecksumOpt checksumOpt) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- @Override
- public FSDataOutputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
- cflags, replication, blockSize, progress, bufferSize,
- checksumOpt);
- return dfs.createWrappedOutputStream(dfsos, statistics);
- }
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.create(p, permission, cflags, bufferSize,
- replication, blockSize, progress, checksumOpt);
- }
- }.resolve(this, absF);
- }
-
- @Override
- protected HdfsDataOutputStream primitiveCreate(Path f,
- FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
- short replication, long blockSize, Progressable progress,
- ChecksumOpt checksumOpt) throws IOException {
- statistics.incrementWriteOps(1);
- final DFSOutputStream dfsos = dfs.primitiveCreate(
- getPathName(fixRelativePart(f)),
- absolutePermission, flag, true, replication, blockSize,
- progress, bufferSize, checksumOpt);
- return dfs.createWrappedOutputStream(dfsos, statistics);
- }
-
- /**
- * Same as create(), except fails if parent directory doesn't already exist.
- */
- @Override
- @SuppressWarnings("deprecation")
- public FSDataOutputStream createNonRecursive(final Path f,
- final FsPermission permission, final EnumSet<CreateFlag> flag,
- final int bufferSize, final short replication, final long blockSize,
- final Progressable progress) throws IOException {
- statistics.incrementWriteOps(1);
- if (flag.contains(CreateFlag.OVERWRITE)) {
- flag.add(CreateFlag.CREATE);
- }
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- @Override
- public FSDataOutputStream doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
- flag, false, replication, blockSize, progress, bufferSize, null);
- return dfs.createWrappedOutputStream(dfsos, statistics);
- }
-
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.createNonRecursive(p, permission, flag, bufferSize,
- replication, blockSize, progress);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public boolean setReplication(Path src,
- final short replication
- ) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(src);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.setReplication(getPathName(p), replication);
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.setReplication(p, replication);
- }
- }.resolve(this, absF);
- }
-
- /**
- * Set the source path to the specified storage policy.
- *
- * @param src The source path referring to either a directory or a file.
- * @param policyName The name of the storage policy.
- */
- @Override
- public void setStoragePolicy(final Path src, final String policyName)
- throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(src);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setStoragePolicy(getPathName(p), policyName);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.setStoragePolicy(p, policyName);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(path);
-
- return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
- @Override
- public BlockStoragePolicySpi doCall(final Path p) throws IOException {
- return getClient().getStoragePolicy(getPathName(p));
- }
-
- @Override
- public BlockStoragePolicySpi next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getStoragePolicy(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Collection<BlockStoragePolicy> getAllStoragePolicies()
- throws IOException {
- return Arrays.asList(dfs.getStoragePolicies());
- }
-
- /**
- * Deprecated. Prefer {@link FileSystem#getAllStoragePolicies()}
- * @return
- * @throws IOException
- */
- @Deprecated
- public BlockStoragePolicy[] getStoragePolicies() throws IOException {
- statistics.incrementReadOps(1);
- return dfs.getStoragePolicies();
- }
-
- /**
- * Move blocks from srcs to trg and delete srcs afterwards.
- * The file block sizes must be the same.
- *
- * @param trg existing file to append to
- * @param psrcs list of files (same block size, same replication)
- * @throws IOException
- */
- @Override
- public void concat(Path trg, Path [] psrcs) throws IOException {
- statistics.incrementWriteOps(1);
- // Make target absolute
- Path absF = fixRelativePart(trg);
- // Make all srcs absolute
- Path[] srcs = new Path[psrcs.length];
- for (int i=0; i<psrcs.length; i++) {
- srcs[i] = fixRelativePart(psrcs[i]);
- }
- // Try the concat without resolving any links
- String[] srcsStr = new String[psrcs.length];
- try {
- for (int i=0; i<psrcs.length; i++) {
- srcsStr[i] = getPathName(srcs[i]);
- }
- dfs.concat(getPathName(trg), srcsStr);
- } catch (UnresolvedLinkException e) {
- // Exception could be from trg or any src.
- // Fully resolve trg and srcs. Fail if any of them are a symlink.
- FileStatus stat = getFileLinkStatus(absF);
- if (stat.isSymlink()) {
- throw new IOException("Cannot concat with a symlink target: "
- + trg + " -> " + stat.getPath());
- }
- absF = fixRelativePart(stat.getPath());
- for (int i=0; i<psrcs.length; i++) {
- stat = getFileLinkStatus(srcs[i]);
- if (stat.isSymlink()) {
- throw new IOException("Cannot concat with a symlink src: "
- + psrcs[i] + " -> " + stat.getPath());
- }
- srcs[i] = fixRelativePart(stat.getPath());
- }
- // Try concat again. Can still race with another symlink.
- for (int i=0; i<psrcs.length; i++) {
- srcsStr[i] = getPathName(srcs[i]);
- }
- dfs.concat(getPathName(absF), srcsStr);
- }
- }
-
-
- @SuppressWarnings("deprecation")
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
- statistics.incrementWriteOps(1);
-
- final Path absSrc = fixRelativePart(src);
- final Path absDst = fixRelativePart(dst);
-
- // Try the rename without resolving first
- try {
- return dfs.rename(getPathName(absSrc), getPathName(absDst));
- } catch (UnresolvedLinkException e) {
- // Fully resolve the source
- final Path source = getFileLinkStatus(absSrc).getPath();
- // Keep trying to resolve the destination
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.rename(getPathName(source), getPathName(p));
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- // Should just throw an error in FileSystem#checkPath
- return doCall(p);
- }
- }.resolve(this, absDst);
- }
- }
-
- /**
- * This rename operation is guaranteed to be atomic.
- */
- @SuppressWarnings("deprecation")
- @Override
- public void rename(Path src, Path dst, final Options.Rename... options)
- throws IOException {
- statistics.incrementWriteOps(1);
- final Path absSrc = fixRelativePart(src);
- final Path absDst = fixRelativePart(dst);
- // Try the rename without resolving first
- try {
- dfs.rename(getPathName(absSrc), getPathName(absDst), options);
- } catch (UnresolvedLinkException e) {
- // Fully resolve the source
- final Path source = getFileLinkStatus(absSrc).getPath();
- // Keep trying to resolve the destination
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.rename(getPathName(source), getPathName(p), options);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- // Should just throw an error in FileSystem#checkPath
- return doCall(p);
- }
- }.resolve(this, absDst);
- }
- }
-
- @Override
- public boolean truncate(Path f, final long newLength) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.truncate(getPathName(p), newLength);
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.truncate(p, newLength);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public boolean delete(Path f, final boolean recursive) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.delete(getPathName(p), recursive);
- }
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.delete(p, recursive);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public ContentSummary getContentSummary(Path f) throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<ContentSummary>() {
- @Override
- public ContentSummary doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getContentSummary(getPathName(p));
- }
- @Override
- public ContentSummary next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.getContentSummary(p);
- }
- }.resolve(this, absF);
- }
-
- /** Set a directory's quotas
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long, StorageType)
- */
- public void setQuota(Path src, final long namespaceQuota,
- final long storagespaceQuota) throws IOException {
- Path absF = fixRelativePart(src);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setQuota(getPathName(p), namespaceQuota, storagespaceQuota);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- // setQuota is not defined in FileSystem, so we only can resolve
- // within this DFS
- return doCall(p);
- }
- }.resolve(this, absF);
- }
-
- /**
- * Set the per type storage quota of a directory.
- *
- * @param src target directory whose quota is to be modified.
- * @param type storage type of the specific storage type quota to be modified.
- * @param quota value of the specific storage type quota to be modified.
- * Maybe {@link HdfsConstants#QUOTA_RESET} to clear quota by storage type.
- */
- public void setQuotaByStorageType(
- Path src, final StorageType type, final long quota)
- throws IOException {
- Path absF = fixRelativePart(src);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setQuotaByStorageType(getPathName(p), type, quota);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- // setQuotaByStorageType is not defined in FileSystem, so we only can resolve
- // within this DFS
- return doCall(p);
- }
- }.resolve(this, absF);
- }
-
- private FileStatus[] listStatusInternal(Path p) throws IOException {
- String src = getPathName(p);
-
- // fetch the first batch of entries in the directory
- DirectoryListing thisListing = dfs.listPaths(
- src, HdfsFileStatus.EMPTY_NAME);
-
- if (thisListing == null) { // the directory does not exist
- throw new FileNotFoundException("File " + p + " does not exist.");
- }
-
- HdfsFileStatus[] partialListing = thisListing.getPartialListing();
- if (!thisListing.hasMore()) { // got all entries of the directory
- FileStatus[] stats = new FileStatus[partialListing.length];
- for (int i = 0; i < partialListing.length; i++) {
- stats[i] = partialListing[i].makeQualified(getUri(), p);
- }
- statistics.incrementReadOps(1);
- return stats;
- }
-
- // The directory size is too big that it needs to fetch more
- // estimate the total number of entries in the directory
- int totalNumEntries =
- partialListing.length + thisListing.getRemainingEntries();
- ArrayList<FileStatus> listing =
- new ArrayList<FileStatus>(totalNumEntries);
- // add the first batch of entries to the array list
- for (HdfsFileStatus fileStatus : partialListing) {
- listing.add(fileStatus.makeQualified(getUri(), p));
- }
- statistics.incrementLargeReadOps(1);
-
- // now fetch more entries
- do {
- thisListing = dfs.listPaths(src, thisListing.getLastName());
-
- if (thisListing == null) { // the directory is deleted
- throw new FileNotFoundException("File " + p + " does not exist.");
- }
-
- partialListing = thisListing.getPartialListing();
- for (HdfsFileStatus fileStatus : partialListing) {
- listing.add(fileStatus.makeQualified(getUri(), p));
- }
- statistics.incrementLargeReadOps(1);
- } while (thisListing.hasMore());
-
- return listing.toArray(new FileStatus[listing.size()]);
- }
-
- /**
- * List all the entries of a directory
- *
- * Note that this operation is not atomic for a large directory.
- * The entries of a directory may be fetched from NameNode multiple times.
- * It only guarantees that each name occurs once if a directory
- * undergoes changes between the calls.
- */
- @Override
- public FileStatus[] listStatus(Path p) throws IOException {
- Path absF = fixRelativePart(p);
- return new FileSystemLinkResolver<FileStatus[]>() {
- @Override
- public FileStatus[] doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return listStatusInternal(p);
- }
- @Override
- public FileStatus[] next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.listStatus(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
- final PathFilter filter)
- throws IOException {
- Path absF = fixRelativePart(p);
- return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
- @Override
- public RemoteIterator<LocatedFileStatus> doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return new DirListingIterator<LocatedFileStatus>(p, filter, true);
- }
-
- @Override
- public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
- }
- // symlink resolution for this methos does not work cross file systems
- // because it is a protected method.
- throw new IOException("Link resolution does not work with multiple " +
- "file systems for listLocatedStatus(): " + p);
- }
- }.resolve(this, absF);
- }
-
-
- /**
- * Returns a remote iterator so that followup calls are made on demand
- * while consuming the entries. This reduces memory consumption during
- * listing of a large directory.
- *
- * @param p target path
- * @return remote iterator
- */
- @Override
- public RemoteIterator<FileStatus> listStatusIterator(final Path p)
- throws IOException {
- Path absF = fixRelativePart(p);
- return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
- @Override
- public RemoteIterator<FileStatus> doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return new DirListingIterator<FileStatus>(p, false);
- }
-
- @Override
- public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
- throws IOException {
- return ((DistributedFileSystem)fs).listStatusIterator(p);
- }
- }.resolve(this, absF);
-
- }
-
- /**
- * This class defines an iterator that returns
- * the file status of each file/subdirectory of a directory
- *
- * if needLocation, status contains block location if it is a file
- * throws a RuntimeException with the error as its cause.
- *
- * @param <T> the type of the file status
- */
- private class DirListingIterator<T extends FileStatus>
- implements RemoteIterator<T> {
- private DirectoryListing thisListing;
- private int i;
- private Path p;
- private String src;
- private T curStat = null;
- private PathFilter filter;
- private boolean needLocation;
-
- private DirListingIterator(Path p, PathFilter filter,
- boolean needLocation) throws IOException {
- this.p = p;
- this.src = getPathName(p);
- this.filter = filter;
- this.needLocation = needLocation;
- // fetch the first batch of entries in the directory
- thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
- needLocation);
- statistics.incrementReadOps(1);
- if (thisListing == null) { // the directory does not exist
- throw new FileNotFoundException("File " + p + " does not exist.");
- }
- i = 0;
- }
-
- private DirListingIterator(Path p, boolean needLocation)
- throws IOException {
- this(p, null, needLocation);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean hasNext() throws IOException {
- while (curStat == null && hasNextNoFilter()) {
- T next;
- HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
- if (needLocation) {
- next = (T)((HdfsLocatedFileStatus)fileStat)
- .makeQualifiedLocated(getUri(), p);
- } else {
- next = (T)fileStat.makeQualified(getUri(), p);
- }
- // apply filter if not null
- if (filter == null || filter.accept(next.getPath())) {
- curStat = next;
- }
- }
- return curStat != null;
- }
-
- /** Check if there is a next item before applying the given filter */
- private boolean hasNextNoFilter() throws IOException {
- if (thisListing == null) {
- return false;
- }
- if (i >= thisListing.getPartialListing().length
- && thisListing.hasMore()) {
- // current listing is exhausted & fetch a new listing
- thisListing = dfs.listPaths(src, thisListing.getLastName(),
- needLocation);
- statistics.incrementReadOps(1);
- if (thisListing == null) {
- return false;
- }
- i = 0;
- }
- return (i < thisListing.getPartialListing().length);
- }
-
- @Override
- public T next() throws IOException {
- if (hasNext()) {
- T tmp = curStat;
- curStat = null;
- return tmp;
- }
- throw new java.util.NoSuchElementException("No more entry in " + p);
- }
- }
-
- /**
- * Create a directory, only when the parent directories exist.
- *
- * See {@link FsPermission#applyUMask(FsPermission)} for details of how
- * the permission is applied.
- *
- * @param f The path to create
- * @param permission The permission. See FsPermission#applyUMask for
- * details about how this is used to calculate the
- * effective permission.
- */
- public boolean mkdir(Path f, FsPermission permission) throws IOException {
- return mkdirsInternal(f, permission, false);
- }
-
- /**
- * Create a directory and its parent directories.
- *
- * See {@link FsPermission#applyUMask(FsPermission)} for details of how
- * the permission is applied.
- *
- * @param f The path to create
- * @param permission The permission. See FsPermission#applyUMask for
- * details about how this is used to calculate the
- * effective permission.
- */
- @Override
- public boolean mkdirs(Path f, FsPermission permission) throws IOException {
- return mkdirsInternal(f, permission, true);
- }
-
- private boolean mkdirsInternal(Path f, final FsPermission permission,
- final boolean createParent) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.mkdirs(getPathName(p), permission, createParent);
- }
-
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- // FileSystem doesn't have a non-recursive mkdir() method
- // Best we can do is error out
- if (!createParent) {
- throw new IOException("FileSystem does not support non-recursive"
- + "mkdir");
- }
- return fs.mkdirs(p, permission);
- }
- }.resolve(this, absF);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
- throws IOException {
- statistics.incrementWriteOps(1);
- return dfs.primitiveMkdir(getPathName(f), absolutePermission);
- }
-
-
- @Override
- public void close() throws IOException {
- try {
- dfs.closeOutputStreams(false);
- super.close();
- } finally {
- dfs.close();
- }
- }
-
- @Override
- public String toString() {
- return "DFS[" + dfs + "]";
- }
-
- @InterfaceAudience.Private
- @VisibleForTesting
- public DFSClient getClient() {
- return dfs;
- }
-
- @Override
- public FsStatus getStatus(Path p) throws IOException {
- statistics.incrementReadOps(1);
- return dfs.getDiskStatus();
- }
-
- /**
- * Returns count of blocks with no good replicas left. Normally should be
- * zero.
- *
- * @throws IOException
- */
- public long getMissingBlocksCount() throws IOException {
- return dfs.getMissingBlocksCount();
- }
-
- /**
- * Returns count of blocks with replication factor 1 and have
- * lost the only replica.
- *
- * @throws IOException
- */
- public long getMissingReplOneBlocksCount() throws IOException {
- return dfs.getMissingReplOneBlocksCount();
- }
-
- /**
- * Returns count of blocks with one of more replica missing.
- *
- * @throws IOException
- */
- public long getUnderReplicatedBlocksCount() throws IOException {
- return dfs.getUnderReplicatedBlocksCount();
- }
-
- /**
- * Returns count of blocks with at least one replica marked corrupt.
- *
- * @throws IOException
- */
- public long getCorruptBlocksCount() throws IOException {
- return dfs.getCorruptBlocksCount();
- }
-
- @Override
- public RemoteIterator<Path> listCorruptFileBlocks(Path path)
- throws IOException {
- return new CorruptFileBlockIterator(dfs, path);
- }
-
- /** @return datanode statistics. */
- public DatanodeInfo[] getDataNodeStats() throws IOException {
- return getDataNodeStats(DatanodeReportType.ALL);
- }
-
- /** @return datanode statistics for the given type. */
- public DatanodeInfo[] getDataNodeStats(final DatanodeReportType type
- ) throws IOException {
- return dfs.datanodeReport(type);
- }
-
- /**
- * Enter, leave or get safe mode.
- *
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(
- * HdfsConstants.SafeModeAction,boolean)
- */
- public boolean setSafeMode(HdfsConstants.SafeModeAction action)
- throws IOException {
- return setSafeMode(action, false);
- }
-
- /**
- * Enter, leave or get safe mode.
- *
- * @param action
- * One of SafeModeAction.ENTER, SafeModeAction.LEAVE and
- * SafeModeAction.GET
- * @param isChecked
- * If true check only for Active NNs status, else check first NN's
- * status
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(SafeModeAction, boolean)
- */
- public boolean setSafeMode(HdfsConstants.SafeModeAction action,
- boolean isChecked) throws IOException {
- return dfs.setSafeMode(action, isChecked);
- }
-
- /**
- * Save namespace image.
- *
- * @param timeWindow NameNode can ignore this command if the latest
- * checkpoint was done within the given time period (in
- * seconds).
- * @return true if a new checkpoint has been made
- * @see ClientProtocol#saveNamespace(long, long)
- */
- public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
- return dfs.saveNamespace(timeWindow, txGap);
- }
-
- /**
- * Save namespace image. NameNode always does the checkpoint.
- */
- public void saveNamespace() throws IOException {
- saveNamespace(0, 0);
- }
-
- /**
- * Rolls the edit log on the active NameNode.
- * Requires super-user privileges.
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#rollEdits()
- * @return the transaction ID of the newly created segment
- */
- public long rollEdits() throws AccessControlException, IOException {
- return dfs.rollEdits();
- }
-
- /**
- * enable/disable/check restoreFaileStorage
- *
- * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#restoreFailedStorage(String arg)
- */
- public boolean restoreFailedStorage(String arg)
- throws AccessControlException, IOException {
- return dfs.restoreFailedStorage(arg);
- }
-
-
- /**
- * Refreshes the list of hosts and excluded hosts from the configured
- * files.
- */
- public void refreshNodes() throws IOException {
- dfs.refreshNodes();
- }
-
- /**
- * Finalize previously upgraded files system state.
- * @throws IOException
- */
- public void finalizeUpgrade() throws IOException {
- dfs.finalizeUpgrade();
- }
-
- /**
- * Rolling upgrade: prepare/finalize/query.
- */
- public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
- throws IOException {
- return dfs.rollingUpgrade(action);
- }
-
- /*
- * Requests the namenode to dump data strcutures into specified
- * file.
- */
- public void metaSave(String pathname) throws IOException {
- dfs.metaSave(pathname);
- }
-
- @Override
- public FsServerDefaults getServerDefaults() throws IOException {
- return dfs.getServerDefaults();
- }
-
- /**
- * Returns the stat information about the file.
- * @throws FileNotFoundException if the file does not exist.
- */
- @Override
- public FileStatus getFileStatus(Path f) throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FileStatus>() {
- @Override
- public FileStatus doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- HdfsFileStatus fi = dfs.getFileInfo(getPathName(p));
- if (fi != null) {
- return fi.makeQualified(getUri(), p);
- } else {
- throw new FileNotFoundException("File does not exist: " + p);
- }
- }
- @Override
- public FileStatus next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.getFileStatus(p);
- }
- }.resolve(this, absF);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
- IOException {
- if (!FileSystem.areSymlinksEnabled()) {
- throw new UnsupportedOperationException("Symlinks not supported");
- }
- statistics.incrementWriteOps(1);
- final Path absF = fixRelativePart(link);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- dfs.createSymlink(target.toString(), getPathName(p), createParent);
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- fs.createSymlink(target, p, createParent);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public boolean supportsSymlinks() {
- return true;
- }
-
- @Override
- public FileStatus getFileLinkStatus(final Path f)
- throws AccessControlException, FileNotFoundException,
- UnsupportedFileSystemException, IOException {
- statistics.incrementReadOps(1);
- final Path absF = fixRelativePart(f);
- FileStatus status = new FileSystemLinkResolver<FileStatus>() {
- @Override
- public FileStatus doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
- if (fi != null) {
- return fi.makeQualified(getUri(), p);
- } else {
- throw new FileNotFoundException("File does not exist: " + p);
- }
- }
- @Override
- public FileStatus next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getFileLinkStatus(p);
- }
- }.resolve(this, absF);
- // Fully-qualify the symlink
- if (status.isSymlink()) {
- Path targetQual = FSLinkResolver.qualifySymlinkTarget(this.getUri(),
- status.getPath(), status.getSymlink());
- status.setSymlink(targetQual);
- }
- return status;
- }
-
- @Override
- public Path getLinkTarget(final Path f) throws AccessControlException,
- FileNotFoundException, UnsupportedFileSystemException, IOException {
- statistics.incrementReadOps(1);
- final Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<Path>() {
- @Override
- public Path doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- HdfsFileStatus fi = dfs.getFileLinkInfo(getPathName(p));
- if (fi != null) {
- return fi.makeQualified(getUri(), p).getSymlink();
- } else {
- throw new FileNotFoundException("File does not exist: " + p);
- }
- }
- @Override
- public Path next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getLinkTarget(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- protected Path resolveLink(Path f) throws IOException {
- statistics.incrementReadOps(1);
- String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
- if (target == null) {
- throw new FileNotFoundException("File does not exist: " + f.toString());
- }
- return new Path(target);
- }
-
- @Override
- public FileChecksum getFileChecksum(Path f) throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FileChecksum>() {
- @Override
- public FileChecksum doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getFileChecksum(getPathName(p), Long.MAX_VALUE);
- }
-
- @Override
- public FileChecksum next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.getFileChecksum(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public FileChecksum getFileChecksum(Path f, final long length)
- throws IOException {
- statistics.incrementReadOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FileChecksum>() {
- @Override
- public FileChecksum doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getFileChecksum(getPathName(p), length);
- }
-
- @Override
- public FileChecksum next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- return ((DistributedFileSystem) fs).getFileChecksum(p, length);
- } else {
- throw new UnsupportedFileSystemException(
- "getFileChecksum(Path, long) is not supported by "
- + fs.getClass().getSimpleName());
- }
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void setPermission(Path p, final FsPermission permission
- ) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(p);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setPermission(getPathName(p), permission);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.setPermission(p, permission);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void setOwner(Path p, final String username, final String groupname
- ) throws IOException {
- if (username == null && groupname == null) {
- throw new IOException("username == null && groupname == null");
- }
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(p);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setOwner(getPathName(p), username, groupname);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.setOwner(p, username, groupname);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void setTimes(Path p, final long mtime, final long atime
- ) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(p);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.setTimes(getPathName(p), mtime, atime);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.setTimes(p, mtime, atime);
- return null;
- }
- }.resolve(this, absF);
- }
-
-
- @Override
- protected int getDefaultPort() {
- return HdfsClientConfigKeys.DFS_NAMENODE_RPC_PORT_DEFAULT;
- }
-
- @Override
- public Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
- throws IOException {
- Token<DelegationTokenIdentifier> result =
- dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
- return result;
- }
-
- /**
- * Requests the namenode to tell all datanodes to use a new, non-persistent
- * bandwidth value for dfs.balance.bandwidthPerSec.
- * The bandwidth parameter is the max number of bytes per second of network
- * bandwidth to be used by a datanode during balancing.
- *
- * @param bandwidth Balancer bandwidth in bytes per second for all datanodes.
- * @throws IOException
- */
- public void setBalancerBandwidth(long bandwidth) throws IOException {
- dfs.setBalancerBandwidth(bandwidth);
- }
-
- /**
- * Get a canonical service name for this file system. If the URI is logical,
- * the hostname part of the URI will be returned.
- * @return a service string that uniquely identifies this file system.
- */
- @Override
- public String getCanonicalServiceName() {
- return dfs.getCanonicalServiceName();
- }
-
- @Override
- protected URI canonicalizeUri(URI uri) {
- if (HAUtilClient.isLogicalUri(getConf(), uri)) {
- // Don't try to DNS-resolve logical URIs, since the 'authority'
- // portion isn't a proper hostname
- return uri;
- } else {
- return NetUtils.getCanonicalUri(uri, getDefaultPort());
- }
- }
-
- /**
- * Utility function that returns if the NameNode is in safemode or not. In HA
- * mode, this API will return only ActiveNN's safemode status.
- *
- * @return true if NameNode is in safemode, false otherwise.
- * @throws IOException
- * when there is an issue communicating with the NameNode
- */
- public boolean isInSafeMode() throws IOException {
- return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
- }
-
- /** @see HdfsAdmin#allowSnapshot(Path) */
- public void allowSnapshot(final Path path) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.allowSnapshot(getPathName(p));
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.allowSnapshot(p);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + path + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- /** @see HdfsAdmin#disallowSnapshot(Path) */
- public void disallowSnapshot(final Path path) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.disallowSnapshot(getPathName(p));
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.disallowSnapshot(p);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + path + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Path createSnapshot(final Path path, final String snapshotName)
- throws IOException {
- Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<Path>() {
- @Override
- public Path doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return new Path(dfs.createSnapshot(getPathName(p), snapshotName));
- }
-
- @Override
- public Path next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- return myDfs.createSnapshot(p);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + path + " -> " + p);
- }
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void renameSnapshot(final Path path, final String snapshotOldName,
- final String snapshotNewName) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.renameSnapshot(getPathName(p), snapshotOldName, snapshotNewName);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.renameSnapshot(p, snapshotOldName, snapshotNewName);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + path + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * @return All the snapshottable directories
- * @throws IOException
- */
- public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
- throws IOException {
- return dfs.getSnapshottableDirListing();
- }
-
- @Override
- public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
- throws IOException {
- Path absF = fixRelativePart(snapshotDir);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- dfs.deleteSnapshot(getPathName(p), snapshotName);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.deleteSnapshot(p, snapshotName);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + snapshotDir + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * Get the difference between two snapshots, or between a snapshot and the
- * current tree of a directory.
- *
- * @see DFSClient#getSnapshotDiffReport(String, String, String)
- */
- public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
- final String fromSnapshot, final String toSnapshot) throws IOException {
- Path absF = fixRelativePart(snapshotDir);
- return new FileSystemLinkResolver<SnapshotDiffReport>() {
- @Override
- public SnapshotDiffReport doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.getSnapshotDiffReport(getPathName(p), fromSnapshot,
- toSnapshot);
- }
-
- @Override
- public SnapshotDiffReport next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- myDfs.getSnapshotDiffReport(p, fromSnapshot, toSnapshot);
- } else {
- throw new UnsupportedOperationException("Cannot perform snapshot"
- + " operations on a symlink to a non-DistributedFileSystem: "
- + snapshotDir + " -> " + p);
- }
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * Get the close status of a file
- * @param src The path to the file
- *
- * @return return true if file is closed
- * @throws FileNotFoundException if the file does not exist.
- * @throws IOException If an I/O error occurred
- */
- public boolean isFileClosed(final Path src) throws IOException {
- Path absF = fixRelativePart(src);
- return new FileSystemLinkResolver<Boolean>() {
- @Override
- public Boolean doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.isFileClosed(getPathName(p));
- }
-
- @Override
- public Boolean next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem)fs;
- return myDfs.isFileClosed(p);
- } else {
- throw new UnsupportedOperationException("Cannot call isFileClosed"
- + " on a symlink to a non-DistributedFileSystem: "
- + src + " -> " + p);
- }
- }
- }.resolve(this, absF);
- }
-
- /**
- * @see {@link #addCacheDirective(CacheDirectiveInfo, EnumSet)}
- */
- public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
- return addCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
- }
-
- /**
- * Add a new CacheDirective.
- *
- * @param info Information about a directive to add.
- * @param flags {@link CacheFlag}s to use for this operation.
- * @return the ID of the directive that was created.
- * @throws IOException if the directive could not be added
- */
- public long addCacheDirective(
- CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
- Preconditions.checkNotNull(info.getPath());
- Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
- makeQualified(getUri(), getWorkingDirectory());
- return dfs.addCacheDirective(
- new CacheDirectiveInfo.Builder(info).
- setPath(path).
- build(),
- flags);
- }
-
- /**
- * @see {@link #modifyCacheDirective(CacheDirectiveInfo, EnumSet)}
- */
- public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
- modifyCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
- }
-
- /**
- * Modify a CacheDirective.
- *
- * @param info Information about the directive to modify. You must set the ID
- * to indicate which CacheDirective you want to modify.
- * @param flags {@link CacheFlag}s to use for this operation.
- * @throws IOException if the directive could not be modified
- */
- public void modifyCacheDirective(
- CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
- if (info.getPath() != null) {
- info = new CacheDirectiveInfo.Builder(info).
- setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
- makeQualified(getUri(), getWorkingDirectory())).build();
- }
- dfs.modifyCacheDirective(info, flags);
- }
-
- /**
- * Remove a CacheDirectiveInfo.
- *
- * @param id identifier of the CacheDirectiveInfo to remove
- * @throws IOException if the directive could not be removed
- */
- public void removeCacheDirective(long id)
- throws IOException {
- dfs.removeCacheDirective(id);
- }
-
- /**
- * List cache directives. Incrementally fetches results from the server.
- *
- * @param filter Filter parameters to use when listing the directives, null to
- * list all directives visible to us.
- * @return A RemoteIterator which returns CacheDirectiveInfo objects.
- */
- public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
- CacheDirectiveInfo filter) throws IOException {
- if (filter == null) {
- filter = new CacheDirectiveInfo.Builder().build();
- }
- if (filter.getPath() != null) {
- filter = new CacheDirectiveInfo.Builder(filter).
- setPath(new Path(getPathName(fixRelativePart(filter.getPath())))).
- build();
- }
- final RemoteIterator<CacheDirectiveEntry> iter =
- dfs.listCacheDirectives(filter);
- return new RemoteIterator<CacheDirectiveEntry>() {
- @Override
- public boolean hasNext() throws IOException {
- return iter.hasNext();
- }
-
- @Override
- public CacheDirectiveEntry next() throws IOException {
- // Although the paths we get back from the NameNode should always be
- // absolute, we call makeQualified to add the scheme and authority of
- // this DistributedFilesystem.
- CacheDirectiveEntry desc = iter.next();
- CacheDirectiveInfo info = desc.getInfo();
- Path p = info.getPath().makeQualified(getUri(), getWorkingDirectory());
- return new CacheDirectiveEntry(
- new CacheDirectiveInfo.Builder(info).setPath(p).build(),
- desc.getStats());
- }
- };
- }
-
- /**
- * Add a cache pool.
- *
- * @param info
- * The request to add a cache pool.
- * @throws IOException
- * If the request could not be completed.
- */
- public void addCachePool(CachePoolInfo info) throws IOException {
- CachePoolInfo.validate(info);
- dfs.addCachePool(info);
- }
-
- /**
- * Modify an existing cache pool.
- *
- * @param info
- * The request to modify a cache pool.
- * @throws IOException
- * If the request could not be completed.
- */
- public void modifyCachePool(CachePoolInfo info) throws IOException {
- CachePoolInfo.validate(info);
- dfs.modifyCachePool(info);
- }
-
- /**
- * Remove a cache pool.
- *
- * @param poolName
- * Name of the cache pool to remove.
- * @throws IOException
- * if the cache pool did not exist, or could not be removed.
- */
- public void removeCachePool(String poolName) throws IOException {
- CachePoolInfo.validateName(poolName);
- dfs.removeCachePool(poolName);
- }
-
- /**
- * List all cache pools.
- *
- * @return A remote iterator from which you can get CachePoolEntry objects.
- * Requests will be made as needed.
- * @throws IOException
- * If there was an error listing cache pools.
- */
- public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
- return dfs.listCachePools();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
- throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.modifyAclEntries(getPathName(p), aclSpec);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.modifyAclEntries(p, aclSpec);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
- throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.removeAclEntries(getPathName(p), aclSpec);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.removeAclEntries(p, aclSpec);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void removeDefaultAcl(Path path) throws IOException {
- final Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.removeDefaultAcl(getPathName(p));
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- fs.removeDefaultAcl(p);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void removeAcl(Path path) throws IOException {
- final Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.removeAcl(getPathName(p));
- return null;
- }
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- fs.removeAcl(p);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void setAcl(Path path, final List<AclEntry> aclSpec) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.setAcl(getPathName(p), aclSpec);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.setAcl(p, aclSpec);
- return null;
- }
- }.resolve(this, absF);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public AclStatus getAclStatus(Path path) throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<AclStatus>() {
- @Override
- public AclStatus doCall(final Path p) throws IOException {
- return dfs.getAclStatus(getPathName(p));
- }
- @Override
- public AclStatus next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getAclStatus(p);
- }
- }.resolve(this, absF);
- }
-
- /* HDFS only */
- public void createEncryptionZone(final Path path, final String keyName)
- throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- dfs.createEncryptionZone(getPathName(p), keyName);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem) fs;
- myDfs.createEncryptionZone(p, keyName);
- return null;
- } else {
- throw new UnsupportedOperationException(
- "Cannot call createEncryptionZone"
- + " on a symlink to a non-DistributedFileSystem: " + path
- + " -> " + p);
- }
- }
- }.resolve(this, absF);
- }
-
- /* HDFS only */
- public EncryptionZone getEZForPath(final Path path)
- throws IOException {
- Preconditions.checkNotNull(path);
- Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<EncryptionZone>() {
- @Override
- public EncryptionZone doCall(final Path p) throws IOException,
- UnresolvedLinkException {
- return dfs.getEZForPath(getPathName(p));
- }
-
- @Override
- public EncryptionZone next(final FileSystem fs, final Path p)
- throws IOException {
- if (fs instanceof DistributedFileSystem) {
- DistributedFileSystem myDfs = (DistributedFileSystem) fs;
- return myDfs.getEZForPath(p);
- } else {
- throw new UnsupportedOperationException(
- "Cannot call getEZForPath"
- + " on a symlink to a non-DistributedFileSystem: " + path
- + " -> " + p);
- }
- }
- }.resolve(this, absF);
- }
-
- /* HDFS only */
- public RemoteIterator<EncryptionZone> listEncryptionZones()
- throws IOException {
- return dfs.listEncryptionZones();
- }
-
- @Override
- public void setXAttr(Path path, final String name, final byte[] value,
- final EnumSet<XAttrSetFlag> flag) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
-
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.setXAttr(getPathName(p), name, value, flag);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.setXAttr(p, name, value, flag);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public byte[] getXAttr(Path path, final String name) throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<byte[]>() {
- @Override
- public byte[] doCall(final Path p) throws IOException {
- return dfs.getXAttr(getPathName(p), name);
- }
- @Override
- public byte[] next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getXAttr(p, name);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Map<String, byte[]> getXAttrs(Path path) throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<Map<String, byte[]>>() {
- @Override
- public Map<String, byte[]> doCall(final Path p) throws IOException {
- return dfs.getXAttrs(getPathName(p));
- }
- @Override
- public Map<String, byte[]> next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getXAttrs(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Map<String, byte[]> getXAttrs(Path path, final List<String> names)
- throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<Map<String, byte[]>>() {
- @Override
- public Map<String, byte[]> doCall(final Path p) throws IOException {
- return dfs.getXAttrs(getPathName(p), names);
- }
- @Override
- public Map<String, byte[]> next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.getXAttrs(p, names);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public List<String> listXAttrs(Path path)
- throws IOException {
- final Path absF = fixRelativePart(path);
- return new FileSystemLinkResolver<List<String>>() {
- @Override
- public List<String> doCall(final Path p) throws IOException {
- return dfs.listXAttrs(getPathName(p));
- }
- @Override
- public List<String> next(final FileSystem fs, final Path p)
- throws IOException, UnresolvedLinkException {
- return fs.listXAttrs(p);
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void removeXAttr(Path path, final String name) throws IOException {
- Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.removeXAttr(getPathName(p), name);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p) throws IOException {
- fs.removeXAttr(p, name);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public void access(Path path, final FsAction mode) throws IOException {
- final Path absF = fixRelativePart(path);
- new FileSystemLinkResolver<Void>() {
- @Override
- public Void doCall(final Path p) throws IOException {
- dfs.checkAccess(getPathName(p), mode);
- return null;
- }
-
- @Override
- public Void next(final FileSystem fs, final Path p)
- throws IOException {
- fs.access(p, mode);
- return null;
- }
- }.resolve(this, absF);
- }
-
- @Override
- public Token<?>[] addDelegationTokens(
- final String renewer, Credentials credentials) throws IOException {
- Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
- if (dfs.isHDFSEncryptionEnabled()) {
- KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension =
- KeyProviderDelegationTokenExtension.
- createKeyProviderDelegationTokenExtension(dfs.getKeyProvider());
- Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
- addDelegationTokens(renewer, credentials);
- if (tokens != null && kpTokens != null) {
- Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
- System.arraycopy(tokens, 0, all, 0, tokens.length);
- System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
- tokens = all;
- } else {
- tokens = (tokens != null) ? tokens : kpTokens;
- }
- }
- return tokens;
- }
-
- public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {
- return dfs.getInotifyEventStream();
- }
-
- public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid)
- throws IOException {
- return dfs.getInotifyEventStream(lastReadTxid);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1c030c6e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
deleted file mode 100644
index 77bed1a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/CorruptFileBlockIterator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.client.impl;
-
-import java.io.IOException;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-
-/**
- * Provides an iterator interface for listCorruptFileBlocks.
- * This class is used by DistributedFileSystem and Hdfs.
- */
-@InterfaceAudience.Private
-public class CorruptFileBlockIterator implements RemoteIterator<Path> {
- private final DFSClient dfs;
- private final String path;
-
- private String[] files = null;
- private int fileIdx = 0;
- private String cookie = null;
- private Path nextPath = null;
-
- private int callsMade = 0;
-
- public CorruptFileBlockIterator(DFSClient dfs, Path path) throws IOException {
- this.dfs = dfs;
- this.path = path2String(path);
- loadNext();
- }
-
- /**
- * @return the number of calls made to the DFSClient.
- * This is for debugging and testing purposes.
- */
- public int getCallsMade() {
- return callsMade;
- }
-
- private String path2String(Path path) {
- return path.toUri().getPath();
- }
-
- private Path string2Path(String string) {
- return new Path(string);
- }
-
- private void loadNext() throws IOException {
- if (files == null || fileIdx >= files.length) {
- CorruptFileBlocks cfb = dfs.listCorruptFileBlocks(path, cookie);
- files = cfb.getFiles();
- cookie = cfb.getCookie();
- fileIdx = 0;
- callsMade++;
- }
-
- if (fileIdx >= files.length) {
- // received an empty response
- // there are no more corrupt file blocks
- nextPath = null;
- } else {
- nextPath = string2Path(files[fileIdx]);
- fileIdx++;
- }
- }
-
-
- @Override
- public boolean hasNext() {
- return nextPath != null;
- }
-
-
- @Override
- public Path next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException("No more corrupt file blocks");
- }
-
- Path result = nextPath;
- loadNext();
-
- return result;
- }
-}
\ No newline at end of file