You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2009/10/30 23:24:23 UTC
svn commit: r831475 [1/2] - in /hadoop/common/trunk: ./ src/java/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/ftp/
src/java/org/apache/hadoop/fs/local/ src/test/core/org/apache/hadoop/fs/
Author: suresh
Date: Fri Oct 30 22:24:22 2009
New Revision: 831475
URL: http://svn.apache.org/viewvc?rev=831475&view=rev
Log:
Hadoop-6223. Add new file system interface AbstractFileSystem with implementation of some file systems that delegate to old FileSystem. Contributed by Sanjay Radia.
Added:
hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/ftp/FtpFs.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/local/
hadoop/common/trunk/src/java/org/apache/hadoop/fs/local/LocalConfigKeys.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/local/LocalFs.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/local/RawLocalFs.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/FileContextPermissionBase.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFcLocalFsPermission.java
Modified:
hadoop/common/trunk/CHANGES.txt
hadoop/common/trunk/src/java/core-default.xml
hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/Options.java
hadoop/common/trunk/src/java/org/apache/hadoop/fs/ftp/FTPFileSystemConfigKeys.java
hadoop/common/trunk/src/test/core/org/apache/hadoop/fs/TestFileContextDeleteOnExit.java
Modified: hadoop/common/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/CHANGES.txt?rev=831475&r1=831474&r2=831475&view=diff
==============================================================================
--- hadoop/common/trunk/CHANGES.txt (original)
+++ hadoop/common/trunk/CHANGES.txt Fri Oct 30 22:24:22 2009
@@ -249,6 +249,10 @@
HADOOP-6313. Implement Syncable interface in FSDataOutputStream to expose
flush APIs to application users. (Hairong Kuang via suresh)
+ Hadoop-6223. Add new file system interface AbstractFileSystem with
+ implementation of some file systems that delegate to old FileSystem.
+ (Sanjay Radia via suresh)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
Modified: hadoop/common/trunk/src/java/core-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/core-default.xml?rev=831475&r1=831474&r2=831475&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/core-default.xml (original)
+++ hadoop/common/trunk/src/java/core-default.xml Fri Oct 30 22:24:22 2009
@@ -163,6 +163,19 @@
</property>
<property>
+ <name>fs.AbstractFileSystem.file.impl</name>
+ <value>org.apache.hadoop.fs.local.LocalFs</value>
+ <description>The AbstractFileSystem for file: uris.</description>
+</property>
+
+
+<property>
+ <name>fs.AbstractFileSystem.hdfs.impl</name>
+ <value>org.apache.hadoop.fs.Hdfs</value>
+ <description>The FileSystem for hdfs: uris.</description>
+</property>
+
+<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3.S3FileSystem</value>
<description>The FileSystem for s3: uris.</description>
Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/AbstractFileSystem.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Options.CreateOpts;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This class provides an interface for implementors of a Hadoop filesystem
+ * (analogous to the VFS of Unix). Applications do not access this class;
+ * instead they access files across all filesystems using {@link FileContext}.
+ *
+ * Pathnames passed to AbstractFileSystem can be fully qualified URI that
+ * matches the "this" filesystem (ie same scheme and authority)
+ * or a Slash-relative name that is assumed to be relative
+ * to the root of the "this" filesystem .
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+public abstract class AbstractFileSystem {
+ static final Log LOG = LogFactory.getLog(AbstractFileSystem.class);
+
+ /** Recording statistics per a filesystem class. */
+ private static final Map<Class<? extends AbstractFileSystem>, Statistics>
+ STATISTICS_TABLE =
+ new IdentityHashMap<Class<? extends AbstractFileSystem>, Statistics>();
+
+ /** Cache of constructors for each filesystem class. */
+ private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+ new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+ private static final Class<?>[] URI_CONFIG_ARGS =
+ new Class[]{URI.class, Configuration.class};
+
+ /** The statistics for this file system. */
+ protected Statistics statistics;
+
+ private final URI myUri;
+
+ protected Statistics getStatistics() {
+ return statistics;
+ }
+
+ /**
+ * Prohibits names which contain a ".", "..". ":" or "/"
+ */
+ private static boolean isValidName(String src) {
+ // Check for ".." "." ":" "/"
+ StringTokenizer tokens = new StringTokenizer(src, Path.SEPARATOR);
+ while(tokens.hasMoreTokens()) {
+ String element = tokens.nextToken();
+ if (element.equals("..") ||
+ element.equals(".") ||
+ (element.indexOf(":") >= 0)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Create an object for the given class and initialize it from conf.
+ * @param theClass class of which an object is created
+ * @param conf Configuration
+ * @return a new object
+ */
+ @SuppressWarnings("unchecked")
+ static <T> T newInstance(Class<T> theClass,
+ URI uri, Configuration conf) {
+ T result;
+ try {
+ Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+ if (meth == null) {
+ meth = theClass.getDeclaredConstructor(URI_CONFIG_ARGS);
+ meth.setAccessible(true);
+ CONSTRUCTOR_CACHE.put(theClass, meth);
+ }
+ result = meth.newInstance(uri, conf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return result;
+ }
+
+ /**
+ * Create a file system instance for the specified uri using the conf.
+ * The conf is used to find the class name that implements the filesystem.
+ * The conf is also passed to the filesystem for its configuration.
+ * @param uri
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ private static AbstractFileSystem createFileSystem(URI uri,
+ Configuration conf) throws IOException {
+ Class<?> clazz = conf.getClass("fs.AbstractFileSystem." +
+ uri.getScheme() + ".impl", null);
+ if (clazz == null) {
+ throw new IOException("No AbstractFileSystem for scheme: "
+ + uri.getScheme());
+ }
+ return (AbstractFileSystem) newInstance(clazz, uri, conf);
+ }
+
+
+ /**
+ * Get the statistics for a particular file system.
+ * @param cls the class to lookup
+ * @return a statistics object
+ */
+ protected static synchronized Statistics getStatistics(String scheme,
+ Class<? extends AbstractFileSystem> cls) {
+ Statistics result = STATISTICS_TABLE.get(cls);
+ if (result == null) {
+ result = new Statistics(scheme);
+ STATISTICS_TABLE.put(cls, result);
+ }
+ return result;
+ }
+
+ protected static synchronized void clearStatistics() {
+ for(Statistics stat: STATISTICS_TABLE.values()) {
+ stat.reset();
+ }
+ }
+
+ protected static synchronized void printStatistics() throws IOException {
+ for (Map.Entry<Class<? extends AbstractFileSystem>, Statistics> pair:
+ STATISTICS_TABLE.entrySet()) {
+ System.out.println(" FileSystem " + pair.getKey().getName() +
+ ": " + pair.getValue());
+ }
+ }
+
+
+ /**
+ * The main factory method for creating a filesystem.
+ * Get a filesystem for the URI's scheme and authority.
+ * The scheme of the URI determines a configuration property name,
+ * <tt>fs.AbstractFileSystem.<i>scheme</i>.impl</tt> whose value names
+ * the AbstractFileSystem class.
+ * The entire URI and conf is passed to the AbstractFileSystem factory
+ * method.
+ * @param uri for the file system to be created.
+ * @param conf which is passed to the filesystem impl.
+ */
+ static AbstractFileSystem get(final URI uri, final Configuration conf)
+ throws IOException {
+ return createFileSystem(uri, conf);
+ }
+
+ /**
+ * Constructor to be called by subclasses.
+ *
+ * @param uri for this file system.
+ * @param supportedScheme the scheme supported by the implementor
+ * @param authorityNeeded if true then theURI must have authority, if false
+ * then the URI must have null authority.
+ * @throws URISyntaxException
+ */
+ protected AbstractFileSystem(final URI uri, final String supportedScheme,
+ final boolean authorityNeeded, final int defaultPort) throws URISyntaxException {
+ myUri = getUri(uri, supportedScheme, authorityNeeded, defaultPort);
+ statistics = getStatistics(supportedScheme, getClass());
+ }
+
+ protected void checkScheme(URI uri, String supportedScheme) {
+ String scheme = uri.getScheme();
+ if (scheme == null) {
+ throw new IllegalArgumentException("Uri without scheme: " + uri);
+ }
+ if (!scheme.equals(supportedScheme)) {
+ throw new IllegalArgumentException("Uri scheme " + uri
+ + " does not match the scheme " + supportedScheme);
+ }
+ }
+
+ /**
+ * Get the URI for the file system based on the given URI. The path, query
+ * part of the given URI is stripped out and default filesystem port is used
+ * to form the URI.
+ *
+ * @param uri FileSystem URI.
+ * @param authorityNeeded if true authority cannot be null in the URI. If
+ * false authority must be null.
+ * @param defaultPort default port to use if port is not specified in the URI.
+ * @return URI of the file system
+ * @throws URISyntaxException
+ */
+ private URI getUri(URI uri, String supportedScheme,
+ boolean authorityNeeded, int defaultPort) throws URISyntaxException {
+ checkScheme(uri, supportedScheme);
+ // A filesystem implementation that requires authority must always
+ // specify default port
+ if (defaultPort < 0 && authorityNeeded) {
+ throw new IllegalArgumentException(
+ "FileSystem implementation error - default port " + defaultPort
+ + " is not valid");
+ }
+ String authority = uri.getAuthority();
+ if (!authorityNeeded) {
+ if (authority != null) {
+ throw new IllegalArgumentException("Scheme with non-null authority: "
+ + uri);
+ }
+ return new URI(supportedScheme + ":///");
+ }
+ if (authority == null) {
+ throw new IllegalArgumentException("Uri without authority: " + uri);
+ }
+ int port = uri.getPort();
+ port = port == -1 ? defaultPort : port;
+ return new URI(supportedScheme + "://" + uri.getHost() + ":" + port);
+ }
+
+ /**
+ * The default port of this filesystem.
+ * @return default port of this filesystem's Uri scheme
+ * A uri with a port of -1 => default port;
+ */
+ protected abstract int getUriDefaultPort();
+
+ /**
+ * Returns a URI whose scheme and authority identify this FileSystem.
+ * @return the uri of this filesystem.
+ */
+ protected URI getUri() {
+ return myUri;
+ }
+
+ /**
+ * Check that a Path belongs to this FileSystem.
+ *
+ * If the path is fully qualified URI, then its scheme and authority
+ * matches that of this file system. Otherwise the path must be
+ * slash-relative name.
+ */
+ protected void checkPath(Path path) {
+ URI uri = path.toUri();
+ String thatScheme = uri.getScheme();
+ String thatAuthority = uri.getAuthority();
+ if (thatScheme == null) {
+ if (thatAuthority == null) {
+ if (path.isUriPathAbsolute()) {
+ return;
+ }
+ throw new IllegalArgumentException("relative paths not allowed:" +
+ path);
+ } else {
+ throw new IllegalArgumentException(
+ "Path without scheme with non-null autorhrity:" + path);
+ }
+ }
+ String thisScheme = this.getUri().getScheme();
+ String thisAuthority = this.getUri().getAuthority();
+
+ // Schemes and authorities must match.
+ // Allow for null Authority for file:///
+ if (!thisScheme.equalsIgnoreCase(thatScheme) ||
+ (thisAuthority != null &&
+ !thisAuthority.equalsIgnoreCase(thatAuthority)) ||
+ (thisAuthority == null && thatAuthority != null)) {
+ throw new IllegalArgumentException("Wrong FS: " + path +
+ ", expected: "+this.getUri());
+ }
+
+ int thisPort = this.getUri().getPort();
+ int thatPort = path.toUri().getPort();
+ if (thatPort == -1) { // -1 => defaultPort of Uri scheme
+ thatPort = this.getUriDefaultPort();
+ }
+ if (thisPort != thatPort) {
+ throw new IllegalArgumentException("Wrong FS: "+path+
+ ", expected: "+this.getUri());
+ }
+ }
+
+ /**
+ * Get the path-part of a pathname. Checks that URI matches this filesystem
+ * and that the path-part is a valid name.
+ * @param p
+ * @return path-part of the Path p
+ */
+ protected String getUriPath(final Path p) {
+ checkPath(p);
+ String s = p.toUri().getPath();
+ if (!isValidName(s)) {
+ throw new IllegalArgumentException("Path part " + s + " from URI" +
+ p + " is not a valid filename.");
+ }
+ return s;
+ }
+
+ /**
+ * Some file systems like LocalFileSystem have an initial workingDir
+ * that we use as the starting workingDir. For other file systems
+ * like HDFS there is no built in notion of an initial workingDir.
+ *
+ * @return the initial workingDir if the filesystem if it has such a notion
+ * otherwise return a null.
+ */
+ protected Path getInitialWorkingDirectory() {
+ return null;
+ }
+
+ /**
+ * Return the current user's home directory in this filesystem.
+ * The default implementation returns "/user/$USER/".
+ */
+ protected Path getHomeDirectory() {
+ return new Path("/user/"+System.getProperty("user.name")).makeQualified(
+ getUri(), null);
+ }
+
+ /**
+ * Return a set of server default configuration values.
+ * @return server default configuration values
+ * @throws IOException
+ */
+ protected abstract FsServerDefaults getServerDefaults() throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#create(Path, EnumSet, Options.CreateOpts...)} except
+ * that the Path f must be fully qualified and the permission is absolute
+ * (i.e. umask has been applied).
+ */
+ protected final FSDataOutputStream create(final Path f,
+ final EnumSet<CreateFlag> createFlag, Options.CreateOpts... opts)
+ throws IOException {
+ checkPath(f);
+ int bufferSize = -1;
+ short replication = -1;
+ long blockSize = -1;
+ int bytesPerChecksum = -1;
+ FsPermission permission = null;
+ Progressable progress = null;
+ Boolean createParent = null;
+
+ for (CreateOpts iOpt : opts) {
+ if (CreateOpts.BlockSize.class.isInstance(iOpt)) {
+ if (blockSize != -1) {
+ throw new IllegalArgumentException("multiple varargs of same kind");
+ }
+ blockSize = ((CreateOpts.BlockSize) iOpt).getValue();
+ } else if (CreateOpts.BufferSize.class.isInstance(iOpt)) {
+ if (bufferSize != -1) {
+ throw new IllegalArgumentException("multiple varargs of same kind");
+ }
+ bufferSize = ((CreateOpts.BufferSize) iOpt).getValue();
+ } else if (CreateOpts.ReplicationFactor.class.isInstance(iOpt)) {
+ if (replication != -1) {
+ throw new IllegalArgumentException("multiple varargs of same kind");
+ }
+ replication = ((CreateOpts.ReplicationFactor) iOpt).getValue();
+ } else if (CreateOpts.BytesPerChecksum.class.isInstance(iOpt)) {
+ if (bytesPerChecksum != -1) {
+ throw new IllegalArgumentException("multiple varargs of same kind");
+ }
+ bytesPerChecksum = ((CreateOpts.BytesPerChecksum) iOpt).getValue();
+ } else if (CreateOpts.Perms.class.isInstance(iOpt)) {
+ if (permission != null) {
+ throw new IllegalArgumentException("multiple varargs of same kind");
+ }
+ permission = ((CreateOpts.Perms) iOpt).getValue();
+ } else if (CreateOpts.Progress.class.isInstance(iOpt)) {
+ if (progress != null) {
+ throw new IllegalArgumentException("multiple varargs of same kind");
+ }
+ progress = ((CreateOpts.Progress) iOpt).getValue();
+ } else if (CreateOpts.CreateParent.class.isInstance(iOpt)) {
+ if (createParent != null) {
+ throw new IllegalArgumentException("multiple varargs of same kind");
+ }
+ createParent = ((CreateOpts.CreateParent) iOpt).getValue();
+ } else {
+ throw new IllegalArgumentException("Unkown CreateOpts of type " +
+ iOpt.getClass().getName());
+ }
+ }
+ if (permission == null) {
+ throw new IllegalArgumentException("no permission supplied");
+ }
+
+
+ FsServerDefaults ssDef = getServerDefaults();
+ if (ssDef.getBlockSize() % ssDef.getBytesPerChecksum() != 0) {
+ throw new IOException("Internal error: default blockSize is" +
+ " not a multiple of default bytesPerChecksum ");
+ }
+
+ if (blockSize == -1) {
+ blockSize = ssDef.getBlockSize();
+ }
+ if (bytesPerChecksum == -1) {
+ bytesPerChecksum = ssDef.getBytesPerChecksum();
+ }
+ if (bufferSize == -1) {
+ bufferSize = ssDef.getFileBufferSize();
+ }
+ if (replication == -1) {
+ replication = ssDef.getReplication();
+ }
+ if (createParent == null) {
+ createParent = false;
+ }
+
+ if (blockSize % bytesPerChecksum != 0) {
+ throw new IllegalArgumentException(
+ "blockSize should be a multiple of checksumsize");
+ }
+
+ return this.createInternal(f, createFlag, permission, bufferSize,
+ replication, blockSize, progress, bytesPerChecksum, createParent);
+ }
+
+ /**
+ * The specification of this method matches that of
+ * {@link #create(Path, EnumSet, Options.CreateOpts...)} except that the opts
+ * have been declared explicitly.
+ */
+ protected abstract FSDataOutputStream createInternal(Path f,
+ EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
+ short replication, long blockSize, Progressable progress,
+ int bytesPerChecksum, boolean createParent) throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#mkdir(Path, FsPermission, boolean)} except that the Path
+ * f must be fully qualified and the permission is absolute (ie umask has been
+ * applied).
+ */
+ protected abstract void mkdir(final Path dir,
+ final FsPermission permission, final boolean createParent)
+ throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#delete(Path, boolean)} except that Path f must be for
+ * this filesystem.
+ */
+ protected abstract boolean delete(final Path f, final boolean recursive)
+ throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#open(Path)} except that Path f must be for this
+ * filesystem.
+ */
+ protected FSDataInputStream open(final Path f) throws IOException {
+ return open(f, getServerDefaults().getFileBufferSize());
+ }
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#open(Path, int)} except that Path f must be for this
+ * filesystem.
+ */
+ protected abstract FSDataInputStream open(final Path f, int bufferSize)
+ throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#setReplication(Path, short)} except that Path f must be
+ * for this filesystem.
+ */
+ protected abstract boolean setReplication(final Path f,
+ final short replication) throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
+ * f must be for this filesystem.
+ */
+ protected final void rename(final Path src, final Path dst,
+ final Options.Rename... options) throws IOException {
+ boolean overwrite = false;
+ if (null != options) {
+ for (Rename option : options) {
+ if (option == Rename.OVERWRITE) {
+ overwrite = true;
+ }
+ }
+ }
+ renameInternal(src, dst, overwrite);
+ }
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
+ * f must be for this filesystem and NO OVERWRITE is performed.
+ *
+ * Filesystems that do not have a built in overwrite need implement only this
+ * method and can take advantage of the default impl of the other
+ * {@link #renameInternal(Path, Path, boolean)}
+ */
+ protected abstract void renameInternal(final Path src, final Path dst)
+ throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#rename(Path, Path, Options.Rename...)} except that Path
+ * f must be for this filesystem.
+ */
+ protected void renameInternal(final Path src, final Path dst,
+ boolean overwrite) throws IOException {
+ // Default implementation deals with overwrite in a non-atomic way
+ final FileStatus srcStatus = getFileStatus(src);
+ if (srcStatus == null) {
+ throw new FileNotFoundException("rename source " + src + " not found.");
+ }
+
+ FileStatus dstStatus;
+ try {
+ dstStatus = getFileStatus(dst);
+ } catch (IOException e) {
+ dstStatus = null;
+ }
+ if (dstStatus != null) {
+ if (srcStatus.isDir() != dstStatus.isDir()) {
+ throw new IOException("Source " + src + " Destination " + dst
+ + " both should be either file or directory");
+ }
+ if (!overwrite) {
+ throw new FileAlreadyExistsException("rename destination " + dst
+ + " already exists.");
+ }
+ // Delete the destination that is a file or an empty directory
+ if (dstStatus.isDir()) {
+ FileStatus[] list = listStatus(dst);
+ if (list != null && list.length != 0) {
+ throw new IOException(
+ "rename cannot overwrite non empty destination directory " + dst);
+ }
+ }
+ delete(dst, false);
+ } else {
+ final Path parent = dst.getParent();
+ final FileStatus parentStatus = getFileStatus(parent);
+ if (parentStatus == null) {
+ throw new FileNotFoundException("rename destination parent " + parent
+ + " not found.");
+ }
+ if (!parentStatus.isDir()) {
+ throw new ParentNotDirectoryException("rename destination parent "
+ + parent + " is a file.");
+ }
+ }
+ renameInternal(src, dst);
+ }
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#setPermission(Path, FsPermission)} except that Path f
+ * must be for this filesystem.
+ */
+ protected abstract void setPermission(final Path f,
+ final FsPermission permission) throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#setOwner(Path, String, String)} except that Path f must
+ * be for this filesystem.
+ */
+ protected abstract void setOwner(final Path f, final String username,
+ final String groupname) throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#setTimes(Path, long, long)} except that Path f must be
+ * for this filesystem.
+ */
+ protected abstract void setTimes(final Path f, final long mtime,
+ final long atime) throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#getFileChecksum(Path)} except that Path f must be for
+ * this filesystem.
+ */
+ protected abstract FileChecksum getFileChecksum(final Path f)
+ throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#setVerifyChecksum(boolean, Path)} except that Path f
+ * must be for this filesystem.
+ */
+ protected abstract FileStatus getFileStatus(final Path f) throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#getFileBlockLocations(Path, long, long)} except that
+ * Path f must be for this filesystem.
+ */
+ protected abstract BlockLocation[] getFileBlockLocations(final Path f,
+ final long start, final long len) throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#getFsStatus(Path)} except that Path f must be for this
+ * filesystem.
+ */
+ protected FsStatus getFsStatus(final Path f) throws IOException {
+ // default impl gets FsStatus of root
+ return getFsStatus();
+ }
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#getFsStatus(Path)} except that Path f must be for this
+ * filesystem.
+ */
+ protected abstract FsStatus getFsStatus() throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#listStatus(Path)} except that Path f must be for this
+ * filesystem.
+ */
+ protected abstract FileStatus[] listStatus(final Path f) throws IOException;
+
+ /**
+ * The specification of this method matches that of
+ * {@link FileContext#setVerifyChecksum(boolean, Path)} except that Path f
+ * must be for this filesystem.
+ */
+ protected abstract void setVerifyChecksum(final boolean verifyChecksum)
+ throws IOException;
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/ChecksumFs.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,462 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Abstract Checksumed Fs.
+ * It provide a basic implementation of a Checksumed Fs,
+ * which creates a checksum file for each raw file.
+ * It generates & verifies checksums at the client side.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+public abstract class ChecksumFs extends FilterFs {
+ private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
+ private int defaultBytesPerChecksum = 512;
+ private boolean verifyChecksum = true;
+
+ public static double getApproxChkSumLength(long size) {
+ return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
+ }
+
+ public ChecksumFs(AbstractFileSystem theFs)
+ throws IOException, URISyntaxException {
+ super(theFs);
+ defaultBytesPerChecksum =
+ getMyFs().getServerDefaults().getBytesPerChecksum();
+ }
+
+ /**
+ * Set whether to verify checksum.
+ */
+ public void setVerifyChecksum(boolean inVerifyChecksum) {
+ this.verifyChecksum = inVerifyChecksum;
+ }
+
+ /** get the raw file system. */
+ public AbstractFileSystem getRawFs() {
+ return getMyFs();
+ }
+
+ /** Return the name of the checksum file associated with a file.*/
+ public Path getChecksumFile(Path file) {
+ return new Path(file.getParent(), "." + file.getName() + ".crc");
+ }
+
+ /** Return true iff file is a checksum file name.*/
+ public static boolean isChecksumFile(Path file) {
+ String name = file.getName();
+ return name.startsWith(".") && name.endsWith(".crc");
+ }
+
+ /** Return the length of the checksum file given the size of the
+ * actual file.
+ **/
+ public long getChecksumFileLength(Path file, long fileSize) {
+ return getChecksumLength(fileSize, getBytesPerSum());
+ }
+
+ /** Return the bytes Per Checksum. */
+ public int getBytesPerSum() {
+ return defaultBytesPerChecksum;
+ }
+
+ private int getSumBufferSize(int bytesPerSum, int bufferSize)
+ throws IOException {
+ int defaultBufferSize = getMyFs().getServerDefaults().getFileBufferSize();
+ int proportionalBufferSize = bufferSize / bytesPerSum;
+ return Math.max(bytesPerSum,
+ Math.max(proportionalBufferSize, defaultBufferSize));
+ }
+
+ /*******************************************************
+ * For open()'s FSInputStream
+ * It verifies that data matches checksums.
+ *******************************************************/
+ private static class ChecksumFSInputChecker extends FSInputChecker {
+ public static final Log LOG
+ = LogFactory.getLog(FSInputChecker.class);
+ private static final int HEADER_LENGTH = 8;
+
+ private ChecksumFs fs;
+ private FSDataInputStream datas;
+ private FSDataInputStream sums;
+ private int bytesPerSum = 1;
+ private long fileLen = -1L;
+
+ public ChecksumFSInputChecker(ChecksumFs fs, Path file)
+ throws IOException {
+ this(fs, file, fs.getServerDefaults().getFileBufferSize());
+ }
+
+ public ChecksumFSInputChecker(ChecksumFs fs, Path file, int bufferSize)
+ throws IOException {
+ super(file, fs.getFileStatus(file).getReplication());
+ this.datas = fs.getRawFs().open(file, bufferSize);
+ this.fs = fs;
+ Path sumFile = fs.getChecksumFile(file);
+ try {
+ int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(),
+ bufferSize);
+ sums = fs.getRawFs().open(sumFile, sumBufferSize);
+
+ byte[] version = new byte[CHECKSUM_VERSION.length];
+ sums.readFully(version);
+ if (!Arrays.equals(version, CHECKSUM_VERSION)) {
+ throw new IOException("Not a checksum file: "+sumFile);
+ }
+ this.bytesPerSum = sums.readInt();
+ set(fs.verifyChecksum, new PureJavaCrc32(), bytesPerSum, 4);
+ } catch (FileNotFoundException e) { // quietly ignore
+ set(fs.verifyChecksum, null, 1, 0);
+ } catch (IOException e) { // loudly ignore
+ LOG.warn("Problem opening checksum file: "+ file +
+ ". Ignoring exception: " +
+ StringUtils.stringifyException(e));
+ set(fs.verifyChecksum, null, 1, 0);
+ }
+ }
+
+ private long getChecksumFilePos(long dataPos) {
+ return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
+ }
+
+ protected long getChunkPosition(long dataPos) {
+ return dataPos/bytesPerSum*bytesPerSum;
+ }
+
+ public int available() throws IOException {
+ return datas.available() + super.available();
+ }
+
+ public int read(long position, byte[] b, int off, int len)
+ throws IOException {
+ // parameter check
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+ if (position<0) {
+ throw new IllegalArgumentException(
+ "Parameter position can not to be negative");
+ }
+
+ ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
+ checker.seek(position);
+ int nread = checker.read(b, off, len);
+ checker.close();
+ return nread;
+ }
+
+ public void close() throws IOException {
+ datas.close();
+ if (sums != null) {
+ sums.close();
+ }
+ set(fs.verifyChecksum, null, 1, 0);
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ final long sumsPos = getChecksumFilePos(targetPos);
+ fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
+ final boolean newDataSource = datas.seekToNewSource(targetPos);
+ return sums.seekToNewSource(sumsPos) || newDataSource;
+ }
+
+ @Override
+ protected int readChunk(long pos, byte[] buf, int offset, int len,
+ byte[] checksum) throws IOException {
+ boolean eof = false;
+ if (needChecksum()) {
+ try {
+ final long checksumPos = getChecksumFilePos(pos);
+ if (checksumPos != sums.getPos()) {
+ sums.seek(checksumPos);
+ }
+ sums.readFully(checksum);
+ } catch (EOFException e) {
+ eof = true;
+ }
+ len = bytesPerSum;
+ }
+ if (pos != datas.getPos()) {
+ datas.seek(pos);
+ }
+ final int nread = readFully(datas, buf, offset, len);
+ if (eof && nread > 0) {
+ throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
+ }
+ return nread;
+ }
+
+ /* Return the file length */
+ private long getFileLength() throws IOException {
+ if (fileLen==-1L) {
+ fileLen = fs.getFileStatus(file).getLen();
+ }
+ return fileLen;
+ }
+
+ /**
+ * Skips over and discards <code>n</code> bytes of data from the
+ * input stream.
+ *
+ * The <code>skip</code> method skips over some smaller number of bytes
+ * when reaching end of file before <code>n</code> bytes have been skipped.
+ * The actual number of bytes skipped is returned. If <code>n</code> is
+ * negative, no bytes are skipped.
+ *
+ * @param n the number of bytes to be skipped.
+ * @return the actual number of bytes skipped.
+ * @exception IOException if an I/O error occurs.
+ * ChecksumException if the chunk to skip to is corrupted
+ */
+ public synchronized long skip(long n) throws IOException {
+ final long curPos = getPos();
+ final long fileLength = getFileLength();
+ if (n+curPos > fileLength) {
+ n = fileLength - curPos;
+ }
+ return super.skip(n);
+ }
+
+ /**
+ * Seek to the given position in the stream.
+ * The next read() will be from that position.
+ *
+ * <p>This method does not allow seek past the end of the file.
+ * This produces IOException.
+ *
+ * @param pos the postion to seek to.
+ * @exception IOException if an I/O error occurs or seeks after EOF
+ * ChecksumException if the chunk to seek to is corrupted
+ */
+
+ public synchronized void seek(long pos) throws IOException {
+ if (pos>getFileLength()) {
+ throw new IOException("Cannot seek after EOF");
+ }
+ super.seek(pos);
+ }
+
+ }
+
+ /**
+ * Opens an FSDataInputStream at the indicated Path.
+ * @param f the file name to open
+ * @param bufferSize the size of the buffer to be used.
+ */
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return new FSDataInputStream(
+ new ChecksumFSInputChecker(this, f, bufferSize));
+ }
+
+ /**
+ * Calculated the length of the checksum file in bytes.
+ * @param size the length of the data file in bytes
+ * @param bytesPerSum the number of bytes in a checksum block
+ * @return the number of bytes in the checksum file
+ */
+ public static long getChecksumLength(long size, int bytesPerSum) {
+ //the checksum length is equal to size passed divided by bytesPerSum +
+ //bytes written in the beginning of the checksum file.
+ return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
+ CHECKSUM_VERSION.length + 4;
+ }
+
+ /** This class provides an output stream for a checksummed file.
+ * It generates checksums for data. */
+ private static class ChecksumFSOutputSummer extends FSOutputSummer {
+ private FSDataOutputStream datas;
+ private FSDataOutputStream sums;
+ private static final float CHKSUM_AS_FRACTION = 0.01f;
+
+
+ public ChecksumFSOutputSummer(final ChecksumFs fs, final Path file,
+ final EnumSet<CreateFlag> createFlag,
+ final FsPermission absolutePermission, final int bufferSize,
+ final short replication, final long blockSize,
+ final Progressable progress, final int bytesPerChecksum,
+ final boolean createParent) throws IOException {
+ super(new PureJavaCrc32(), fs.getBytesPerSum(), 4);
+
+ this.datas = fs.getRawFs().createInternal(file, createFlag,
+ absolutePermission, bufferSize, replication, blockSize, progress,
+ bytesPerChecksum, createParent);
+
+ // Now create the chekcsumfile; adjust the buffsize
+ int bytesPerSum = fs.getBytesPerSum();
+ int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
+ this.sums = fs.getRawFs().createInternal(fs.getChecksumFile(file),
+ EnumSet.of(CreateFlag.OVERWRITE), absolutePermission, sumBufferSize,
+ replication, blockSize, progress, bytesPerChecksum, createParent);
+ sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
+ sums.writeInt(bytesPerSum);
+ }
+
+ public void close() throws IOException {
+ flushBuffer();
+ sums.close();
+ datas.close();
+ }
+
+ @Override
+ protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
+ throws IOException {
+ datas.write(b, offset, len);
+ sums.write(checksum);
+ }
+ }
+
+ @Override
+ protected FSDataOutputStream createInternal(Path f,
+ EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
+ int bufferSize, short replication, long blockSize, Progressable progress,
+ int bytesPerChecksum, boolean createParent) throws IOException {
+
+ final FSDataOutputStream out = new FSDataOutputStream(
+ new ChecksumFSOutputSummer(this, f, createFlag, absolutePermission,
+ bufferSize, replication, blockSize, progress,
+ bytesPerChecksum, createParent), null);
+ return out;
+ }
+
+ /** Check if exists.
+ * @param f source file
+ */
+ private boolean exists(Path f) throws IOException {
+ try {
+ return getMyFs().getFileStatus(f) != null;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ /** True iff the named path is a directory.
+ * Note: Avoid using this method. Instead reuse the FileStatus
+ * returned by getFileStatus() or listStatus() methods.
+ */
+ private boolean isDirectory(Path f) throws IOException {
+ try {
+ return getMyFs().getFileStatus(f).isDir();
+ } catch (FileNotFoundException e) {
+ return false; // f does not exist
+ }
+ }
+ /**
+ * Set replication for an existing file.
+ * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
+ * @param src file name
+ * @param replication new replication
+ * @throws IOException
+ * @return true if successful;
+ * false if file does not exist or is a directory
+ */
+ @Override
+ public boolean setReplication(Path src, short replication)
+ throws IOException {
+ boolean value = getMyFs().setReplication(src, replication);
+ if (!value) {
+ return false;
+ }
+ Path checkFile = getChecksumFile(src);
+ if (exists(checkFile)) {
+ getMyFs().setReplication(checkFile, replication);
+ }
+ return true;
+ }
+
+ /**
+ * Rename files/dirs.
+ */
+ @Override
+ public void renameInternal(Path src, Path dst) throws IOException {
+ if (isDirectory(src)) {
+ getMyFs().rename(src, dst);
+ } else {
+ getMyFs().rename(src, dst);
+
+ Path checkFile = getChecksumFile(src);
+ if (exists(checkFile)) { //try to rename checksum
+ if (isDirectory(dst)) {
+ getMyFs().rename(checkFile, dst);
+ } else {
+ getMyFs().rename(checkFile, getChecksumFile(dst));
+ }
+ }
+ }
+ }
+
+ /**
+ * Implement the delete(Path, boolean) in checksum
+ * file system.
+ */
+ public boolean delete(Path f, boolean recursive) throws IOException{
+ FileStatus fstatus = null;
+ try {
+ fstatus = getMyFs().getFileStatus(f);
+ } catch(FileNotFoundException e) {
+ return false;
+ }
+ if (fstatus.isDir()) {
+ //this works since the crcs are in the same
+ //directories and the files. so we just delete
+ //everything in the underlying filesystem
+ return getMyFs().delete(f, recursive);
+ } else {
+ Path checkFile = getChecksumFile(f);
+ if (exists(checkFile)) {
+ getMyFs().delete(checkFile, true);
+ }
+ return getMyFs().delete(f, true);
+ }
+ }
+
+ /**
+ * Report a checksum error to the file system.
+ * @param f the file name containing the error
+ * @param in the stream open on the file
+ * @param inPos the position of the beginning of the bad data in the file
+ * @param sums the stream open on the checksum file
+ * @param sumsPos the position of the beginning of the bad data in the
+ * checksum file
+ * @return if retry is neccessary
+ */
+ public boolean reportChecksumFailure(Path f, FSDataInputStream in,
+ long inPos, FSDataInputStream sums, long sumsPos) {
+ return false;
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/DelegateToFileSystem.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Implementation of AbstractFileSystem based on the existing implementation of
+ * {@link FileSystem}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class DelegateToFileSystem extends AbstractFileSystem {
+ protected final FileSystem fsImpl;
+
+ protected DelegateToFileSystem(URI theUri, FileSystem theFsImpl,
+ Configuration conf, String supportedScheme, boolean authorityRequired)
+ throws IOException, URISyntaxException {
+ super(theUri, supportedScheme, authorityRequired,
+ FileSystem.getDefaultUri(conf).getPort());
+ fsImpl = theFsImpl;
+ fsImpl.initialize(theUri, conf);
+ fsImpl.statistics = getStatistics();
+ }
+
+ @Override
+ protected Path getInitialWorkingDirectory() {
+ return fsImpl.getInitialWorkingDirectory();
+ }
+
+ @Override
+ @SuppressWarnings("deprecation") // call to primitiveCreate
+ protected FSDataOutputStream createInternal (Path f,
+ EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
+ short replication, long blockSize, Progressable progress,
+ int bytesPerChecksum, boolean createParent) throws IOException {
+ checkPath(f);
+
+ // Default impl assumes that permissions do not matter
+ // calling the regular create is good enough.
+ // FSs that implement permissions should override this.
+
+ if (!createParent) { // parent must exist.
+ // since this.create makes parent dirs automatically
+ // we must throw exception if parent does not exist.
+ final FileStatus stat = getFileStatus(f.getParent());
+ if (stat == null) {
+ throw new FileNotFoundException("Missing parent:" + f);
+ }
+ if (!stat.isDir()) {
+ throw new ParentNotDirectoryException("parent is not a dir:" + f);
+ }
+ // parent does exist - go ahead with create of file.
+ }
+ return fsImpl.primitiveCreate(f, absolutePermission, flag,
+ bufferSize, replication, blockSize, progress, bytesPerChecksum);
+ }
+
+ @Override
+ protected boolean delete(Path f, boolean recursive) throws IOException {
+ checkPath(f);
+ return fsImpl.delete(f, recursive);
+ }
+
+ @Override
+ protected BlockLocation[] getFileBlockLocations(Path f, long start, long len)
+ throws IOException {
+ checkPath(f);
+ return fsImpl.getFileBlockLocations(f, start, len);
+ }
+
+ @Override
+ protected FileChecksum getFileChecksum(Path f) throws IOException {
+ checkPath(f);
+ return fsImpl.getFileChecksum(f);
+ }
+
+ @Override
+ protected FileStatus getFileStatus(Path f) throws IOException {
+ checkPath(f);
+ return fsImpl.getFileStatus(f);
+ }
+
+ @Override
+ protected FsStatus getFsStatus() throws IOException {
+ return fsImpl.getStatus();
+ }
+
+ @Override
+ protected FsServerDefaults getServerDefaults() throws IOException {
+ return fsImpl.getServerDefaults();
+ }
+
+ @Override
+ protected int getUriDefaultPort() {
+ return 0;
+ }
+
+ @Override
+ protected FileStatus[] listStatus(Path f) throws IOException {
+ checkPath(f);
+ return fsImpl.listStatus(f);
+ }
+
+ @Override
+ @SuppressWarnings("deprecation") // call to primitiveMkdir
+ protected void mkdir(Path dir, FsPermission permission, boolean createParent)
+ throws IOException {
+ checkPath(dir);
+ fsImpl.primitiveMkdir(dir, permission, createParent);
+
+ }
+
+ @Override
+ protected FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ checkPath(f);
+ return fsImpl.open(f, bufferSize);
+ }
+
+ @Override
+ @SuppressWarnings("deprecation") // call to rename
+ protected void renameInternal(Path src, Path dst) throws IOException {
+ checkPath(src);
+ checkPath(dst);
+ fsImpl.rename(src, dst, Options.Rename.NONE);
+
+ }
+
+ @Override
+ protected void setOwner(Path f, String username, String groupname)
+ throws IOException {
+ checkPath(f);
+ fsImpl.setOwner(f, username, groupname);
+
+ }
+
+ @Override
+ protected void setPermission(Path f, FsPermission permission)
+ throws IOException {
+ checkPath(f);
+ fsImpl.setPermission(f, permission);
+ }
+
+ @Override
+ protected boolean setReplication(Path f, short replication)
+ throws IOException {
+ checkPath(f);
+ return fsImpl.setReplication(f, replication);
+ }
+
+ @Override
+ protected void setTimes(Path f, long mtime, long atime) throws IOException {
+ checkPath(f);
+ fsImpl.setTimes(f, mtime, atime);
+
+ }
+
+ @Override
+ protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
+ fsImpl.setVerifyChecksum(verifyChecksum);
+ }
+}
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java?rev=831475&r1=831474&r2=831475&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileContext.java Fri Oct 30 22:24:22 2009
@@ -37,13 +37,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.Project;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Progressable;
/**
* The FileContext class provides an interface to the application writer for
@@ -51,81 +50,102 @@
* It provides a set of methods for the usual operation: create, open,
* list, etc
*
- * *** Path Names ***
+ * <p>
+ * <b> *** Path Names *** </b>
+ * <p>
*
* The Hadoop filesystem supports a URI name space and URI names.
* It offers a a forest of filesystems that can be referenced using fully
* qualified URIs.
- *
* Two common Hadoop filesystems implementations are
- * the local filesystem: file:///path
- * the hdfs filesystem hdfs://nnAddress:nnPort/path
- *
+ * <ul>
+ * <li> the local filesystem: file:///path
+ * <li> the hdfs filesystem hdfs://nnAddress:nnPort/path
+ * </ul>
* While URI names are very flexible, it requires knowing the name or address
* of the server. For convenience one often wants to access the default system
- * in your environment without knowing its name/address. This has an
+ * in one's environment without knowing its name/address. This has an
* additional benefit that it allows one to change one's default fs
- * (say your admin moves you from cluster1 to cluster2).
- *
- * Too facilitate this Hadoop supports a notion of a default filesystem.
+ * (e.g. admin moves application from cluster1 to cluster2).
+ * <p>
+ * To facilitate this, Hadoop supports a notion of a default filesystem.
* The user can set his default filesystem, although this is
- * typically set up for you in your environment in your default config.
+ * typically set up for you in your environment via your default config.
* A default filesystem implies a default scheme and authority; slash-relative
* names (such as /for/bar) are resolved relative to that default FS.
* Similarly a user can also have working-directory-relative names (i.e. names
* not starting with a slash). While the working directory is generally in the
- * same default FS, the wd can be in a different FS; in particular, changing
- * the default filesystem DOES NOT change the working directory,
- *
+ * same default FS, the wd can be in a different FS.
+ * <p>
* Hence Hadoop path names can be one of:
- * fully qualified URI: scheme://authority/path
- * slash relative names: /path - relative to the default filesystem
- * wd-relative names: path - relative to the working dir
- *
- * Relative paths with scheme (scheme:foo/bar) are illegal
- *
- * ****The Role of the FileContext and configuration defaults****
+ * <ul>
+ * <li> fully qualified URI: scheme://authority/path
+ * <li> slash relative names: /path - relative to the default filesystem
+ * <li> wd-relative names: path - relative to the working dir
+ * </ul>
+ * Relative paths with scheme (scheme:foo/bar) are illegal.
+ * <p>
+ * <b>****The Role of the FileContext and configuration defaults****</b>
+ * <p>
* The FileContext provides file namespace context for resolving file names;
* it also contains the umask for permissions, In that sense it is like the
* per-process file-related state in Unix system.
- * These, in general, are obtained from the default configuration file
- * in your environment, (@see {@link Configuration}
- *
+ * These two properties
+ * <ul>
+ * <li> default file system i.e your slash)
+ * <li> umask
+ * </ul>
+ * in general, are obtained from the default configuration file
+ * in your environment, (@see {@link Configuration}).
* No other configuration parameters are obtained from the default config as
* far as the file context layer is concerned. All filesystem instances
* (i.e. deployments of filesystems) have default properties; we call these
* server side (SS) defaults. Operation like create allow one to select many
- * properties: either pass them in as explicit parameters or
- * one can choose to used the SS properties.
- *
+ * properties: either pass them in as explicit parameters or use
+ * the SS properties.
+ * <p>
* The filesystem related SS defaults are
- * - the home directory (default is "/user/<userName>")
- * - the initial wd (only for local fs)
- * - replication factor
- * - block size
- * - buffer size
- * - bytesPerChecksum (if used).
+ * <ul>
+ * <li> the home directory (default is "/user/userName")
+ * <li> the initial wd (only for local fs)
+ * <li> replication factor
+ * <li> block size
+ * <li> buffer size
+ * <li> bytesPerChecksum (if used).
+ * </ul>
*
- *
- * *** Usage Model for the FileContext class ***
- *
+ * <p>
+ * <b> *** Usage Model for the FileContext class *** </b>
+ * <p>
* Example 1: use the default config read from the $HADOOP_CONFIG/core.xml.
- * Unspecified values come from core-defaults.xml in the release jar.
- *
- * myFiles = getFileContext(); // uses the default config
- * myFiles.create(path, ...);
- * myFiles.setWorkingDir(path)
- * myFiles.open (path, ...);
- *
- * Example 2: Use a specific config, ignoring $HADOOP_CONFIG
- * configX = someConfigSomeOnePassedToYou.
- * myFContext = getFileContext(configX); //configX not changed but passeddown
- * myFContext.create(path, ...);
- * myFContext.setWorkingDir(path)
- *
- * Other ways of creating new FileContexts:
- * getLocalFSFileContext(...) // local filesystem is the default FS
- * getLocalFileContext(URI, ...) // where specified URI is default FS.
+ * Unspecified values come from core-defaults.xml in the release jar.
+ * <ul>
+ * <li> myFContext = FileContext.getFileContext(); // uses the default config
+ * // which has your default FS
+ * <li> myFContext.create(path, ...);
+ * <li> myFContext.setWorkingDir(path)
+ * <li> myFContext.open (path, ...);
+ * </ul>
+ * Example 2: Get a FileContext with a specific URI as the default FS
+ * <ul>
+ * <li> myFContext = FileContext.getFileContext(URI)
+ * <li> myFContext.create(path, ...);
+ * ...
+ * </ul>
+ * Example 3: FileContext with local file system as the default
+ * <ul>
+ * <li> myFContext = FileContext.getLocalFSFileContext()
+ * <li> myFContext.create(path, ...);
+ * <li> ...
+ * </ul>
+ * Example 4: Use a specific config, ignoring $HADOOP_CONFIG
+ * Generally you should not need use a config unless you are doing
+ * <ul>
+ * <li> configX = someConfigSomeOnePassedToYou.
+ * <li> myFContext = getFileContext(configX); //configX not changed but passeddown
+ * <li> myFContext.create(path, ...);
+ * <li>...
+ * </ul>
*
*/
@@ -135,41 +155,44 @@
public final class FileContext {
public static final Log LOG = LogFactory.getLog(FileContext.class);
+ public static final FsPermission DEFAULT_PERM = FsPermission.getDefault();
/**
- * List of files that should be deleted on JVM shutdown
+ * List of files that should be deleted on JVM shutdown.
*/
- final static Map<FileContext, Set<Path>> deleteOnExit =
+ static final Map<FileContext, Set<Path>> DELETE_ON_EXIT =
new IdentityHashMap<FileContext, Set<Path>>();
- /** JVM shutdown hook thread */
- final static FileContextFinalizer finalizer =
+ /** JVM shutdown hook thread. */
+ static final FileContextFinalizer FINALIZER =
new FileContextFinalizer();
+ private static final PathFilter DEFAULT_FILTER = new PathFilter() {
+ public boolean accept(final Path file) {
+ return true;
+ }
+ };
+
/**
* The FileContext is defined by.
* 1) defaultFS (slash)
* 2) wd
* 3) umask
- *
*/
- private final FileSystem defaultFS; // the default FS for this FileContext.
+ private final AbstractFileSystem defaultFS; //default FS for this FileContext.
private Path workingDir; // Fully qualified
private FsPermission umask;
- private final Configuration conf; // passed to the filesystem below
- // When we move to new AbstractFileSystem
- // then it is not really needed except for
- // undocumented config vars;
+ private final Configuration conf;
- private FileContext(final FileSystem defFs, final FsPermission theUmask,
- final Configuration aConf) {
+ private FileContext(final AbstractFileSystem defFs,
+ final FsPermission theUmask, final Configuration aConf) {
defaultFS = defFs;
umask = FsPermission.getUMask(aConf);
conf = aConf;
/*
* Init the wd.
* WorkingDir is implemented at the FileContext layer
- * NOT at the FileSystem layer.
+ * NOT at the AbstractFileSystem layer.
* If the DefaultFS, such as localFilesystem has a notion of
* builtin WD, we use that as the initial WD.
* Otherwise the WD is initialized to the home directory.
@@ -205,21 +228,20 @@
* Delete all the paths that were marked as delete-on-exit.
*/
static void processDeleteOnExit() {
- synchronized (deleteOnExit) {
- Set<Entry<FileContext, Set<Path>>> set = deleteOnExit.entrySet();
+ synchronized (DELETE_ON_EXIT) {
+ Set<Entry<FileContext, Set<Path>>> set = DELETE_ON_EXIT.entrySet();
for (Entry<FileContext, Set<Path>> entry : set) {
FileContext fc = entry.getKey();
Set<Path> paths = entry.getValue();
for (Path path : paths) {
try {
fc.delete(path, true);
- }
- catch (IOException e) {
+ } catch (IOException e) {
LOG.warn("Ignoring failure to deleteOnExit for path " + path);
}
}
}
- deleteOnExit.clear();
+ DELETE_ON_EXIT.clear();
}
}
@@ -241,30 +263,27 @@
* @return the filesystem of the path
* @throws IOException
*/
- private FileSystem getFSofPath(final Path absOrFqPath) throws IOException {
+ private AbstractFileSystem getFSofPath(final Path absOrFqPath)
+ throws IOException {
checkNotSchemeWithRelative(absOrFqPath);
if (!absOrFqPath.isAbsolute() && absOrFqPath.toUri().getScheme() == null) {
throw new IllegalArgumentException(
"FileContext Bug: path is relative");
}
-
- // TBD cleanup this impl once we create a new FileSystem to replace current
- // one - see HADOOP-6223.
+
try {
// Is it the default FS for this FileContext?
defaultFS.checkPath(absOrFqPath);
return defaultFS;
} catch (Exception e) { // it is different FileSystem
- return FileSystem.get(absOrFqPath.toUri(), conf);
+ return AbstractFileSystem.get(absOrFqPath.toUri(), conf);
}
}
/**
* Protected Static Factory methods for getting a FileContexts
- * that take a FileSystem as input. To be used for testing.
- * Protected since new FileSystem will be protected.
- * Note new file contexts are created for each call.
+ * that take a AbstractFileSystem as input. To be used for testing.
*/
/**
@@ -276,27 +295,24 @@
* @return new FileContext with specifed FS as default.
* @throws IOException if the filesystem with specified cannot be created
*/
- protected static FileContext getFileContext(final FileSystem defFS,
+ protected static FileContext getFileContext(final AbstractFileSystem defFS,
final Configuration aConf) throws IOException {
return new FileContext(defFS, FsPermission.getUMask(aConf), aConf);
}
/**
- * Create a FileContext for specified FileSystem using the default config.
+ * Create a FileContext for specified filesystem using the default config.
*
* @param defaultFS
- * @return a FileSystem for the specified URI
- * @throws IOException if the filesysem with specified cannot be created
+ * @return a FileContext with the specified AbstractFileSystem
+ * as the default FS.
+ * @throws IOException if the filesystem with specified cannot be created
*/
- protected static FileContext getFileContext(final FileSystem defaultFS)
- throws IOException {
+ protected static FileContext getFileContext(
+ final AbstractFileSystem defaultFS) throws IOException {
return getFileContext(defaultFS, new Configuration());
}
-
- public static final URI LOCAL_FS_URI = URI.create("file:///");
- public static final FsPermission DEFAULT_PERM = FsPermission.getDefault();
-
/**
* Static Factory methods for getting a FileContext.
* Note new file contexts are created for each call.
@@ -310,7 +326,7 @@
* The keys relevant to the FileContext layer are extracted at time of
* construction. Changes to the config after the call are ignore
* by the FileContext layer.
- * The conf is passed to lower layers like FileSystem and HDFS which
+ * The conf is passed to lower layers like AbstractFileSystem and HDFS which
* pick up their own config variables.
*/
@@ -320,7 +336,7 @@
* Unspecified key-values for config are defaulted from core-defaults.xml
* in the release jar.
*
- * @throws IOException if default FileSystem in the config cannot be created
+ * @throws IOException if default filesystem in the config cannot be created
*/
public static FileContext getFileContext() throws IOException {
return getFileContext(new Configuration());
@@ -334,7 +350,7 @@
*/
public static FileContext getLocalFSFileContext() throws IOException {
if (localFsSingleton == null) {
- localFsSingleton = getFileContext(LOCAL_FS_URI);
+ localFsSingleton = getFileContext(FsConstants.LOCAL_FS_URI);
}
return localFsSingleton;
}
@@ -344,7 +360,7 @@
* Create a FileContext for specified URI using the default config.
*
* @param defaultFsUri
- * @return a FileSystem for the specified URI
+ * @return a FileContext with the specified URI as the default FS.
* @throws IOException if the filesysem with specified cannot be created
*/
public static FileContext getFileContext(final URI defaultFsUri)
@@ -362,15 +378,18 @@
*/
public static FileContext getFileContext(final URI defaultFsUri,
final Configuration aConf) throws IOException {
- return getFileContext(FileSystem.get(defaultFsUri, aConf), aConf);
+ return getFileContext(AbstractFileSystem.get(defaultFsUri, aConf), aConf);
}
/**
* Create a FileContext using the passed config.
+ * Generally it is better to use {@link #getFileContext(URI, Configuration)}
+ * instead of this one.
+ *
*
* @param aConf
* @return new FileContext
- * @throws IOException if default FileSystem in the config cannot be created
+ * @throws IOException if default filesystem in the config cannot be created
*/
public static FileContext getFileContext(final Configuration aConf)
throws IOException {
@@ -385,14 +404,14 @@
*/
public static FileContext getLocalFSFileContext(final Configuration aConf)
throws IOException {
- return getFileContext(LOCAL_FS_URI, aConf);
+ return getFileContext(FsConstants.LOCAL_FS_URI, aConf);
}
/* This method is needed for tests. */
@InterfaceAudience.Private
@InterfaceStability.Unstable /* return type will change to AFS once
HADOOP-6223 is completed */
- protected FileSystem getDefaultFileSystem() {
+ protected AbstractFileSystem getDefaultFileSystem() {
return defaultFS;
}
@@ -463,59 +482,49 @@
* @param f the file name to open
* @param createFlag gives the semantics of create: overwrite, append etc.
* @param opts - varargs of CreateOpt:
- * Progress - to report progress on the operation - default null
- * Permission - umask is applied against permisssion:
- * default FsPermissions:getDefault()
- * @see #setPermission(Path, FsPermission)
- * CreateParent - create missing parent path
- * default is to not create parents
- *
- * The defaults for the following are SS defaults of the
- * file server implementing the tart path.
- * Not all parameters make sense for all kinds of filesystem
+ * <ul>
+ * <li> Progress - to report progress on the operation - default null
+ * <li> Permission - umask is applied against permisssion:
+ * default is FsPermissions:getDefault()
+
+ * <li> CreateParent - create missing parent path; default is to not
+ * create parents
+ * <li> The defaults for the following are SS defaults of the
+ * file server implementing the target path.
+ * Not all parameters make sense for all kinds of filesystem
* - eg. localFS ignores Blocksize, replication, checksum
- * BufferSize - buffersize used in FSDataOutputStream
- * Blocksize - block size for file blocks
- * ReplicationFactor - replication for blocks
- * BytesPerChecksum - bytes per checksum
- *
- *
+ * <ul>
+ * <li> BufferSize - buffersize used in FSDataOutputStream
+ * <li> Blocksize - block size for file blocks
+ * <li> ReplicationFactor - replication for blocks
+ * <li> BytesPerChecksum - bytes per checksum
+ * </ul>
+ * </ul>
+ *
* @throws IOException
+ *
+ * @see #setPermission(Path, FsPermission)
*/
- @SuppressWarnings("deprecation") // call to primitiveCreate
public FSDataOutputStream create(final Path f,
- final EnumSet<CreateFlag> createFlag,
- CreateOpts... opts)
+ final EnumSet<CreateFlag> createFlag,
+ Options.CreateOpts... opts)
throws IOException {
Path absF = fixRelativePart(f);
- FileSystem fsOfAbsF = getFSofPath(absF);
+ AbstractFileSystem fsOfAbsF = getFSofPath(absF);
// If one of the options is a permission, extract it & apply umask
// If not, add a default Perms and apply umask;
- // FileSystem#create
+ // AbstractFileSystem#create
- FsPermission permission = null;
-
- if (opts != null) {
- for (int i = 0; i < opts.length; ++i) {
- if (opts[i] instanceof CreateOpts.Perms) {
- if (permission != null)
- throw new IllegalArgumentException("multiple permissions varargs");
- permission = ((CreateOpts.Perms) opts[i]).getValue();
- opts[i] = CreateOpts.perms(permission.applyUMask(umask));
- }
- }
- }
-
- CreateOpts[] theOpts = opts;
- if (permission == null) { // no permission was set
- CreateOpts[] newOpts = new CreateOpts[opts.length + 1];
- System.arraycopy(opts, 0, newOpts, 0, opts.length);
- newOpts[opts.length] =
- CreateOpts.perms(FsPermission.getDefault().applyUMask(umask));
- theOpts = newOpts;
- }
- return fsOfAbsF.primitiveCreate(absF, createFlag, theOpts);
+ CreateOpts.Perms permOpt =
+ (CreateOpts.Perms) CreateOpts.getOpt(CreateOpts.Perms.class, opts);
+ FsPermission permission = (permOpt != null) ? permOpt.getValue() :
+ FsPermission.getDefault();
+ permission = permission.applyUMask(umask);
+
+ CreateOpts[] updatedOpts =
+ CreateOpts.setOpt(CreateOpts.perms(permission), opts);
+ return fsOfAbsF.create(absF, createFlag, updatedOpts);
}
/**
@@ -529,14 +538,13 @@
* @throws IOException when operation fails not authorized or
* if parent does not exist and createParent is false.
*/
- @SuppressWarnings("deprecation") // call to primitiveMkdir
public void mkdir(final Path dir, final FsPermission permission,
final boolean createParent)
throws IOException {
Path absDir = fixRelativePart(dir);
FsPermission absFerms = (permission == null ?
FsPermission.getDefault() : permission).applyUMask(umask);
- getFSofPath(absDir).primitiveMkdir(absDir, absFerms, createParent);
+ getFSofPath(absDir).mkdir(absDir, absFerms, createParent);
}
/**
@@ -615,15 +623,15 @@
* @param dst new path after rename
* @throws IOException on failure
*/
- @SuppressWarnings("deprecation")
- public void rename(final Path src, final Path dst, final Rename... options)
- throws IOException {
+
+ public void rename(final Path src, final Path dst,
+ final Options.Rename... options) throws IOException {
final Path absSrc = fixRelativePart(src);
final Path absDst = fixRelativePart(dst);
- FileSystem srcFS = getFSofPath(absSrc);
- FileSystem dstFS = getFSofPath(absDst);
+ AbstractFileSystem srcFS = getFSofPath(absSrc);
+ AbstractFileSystem dstFS = getFSofPath(absDst);
if(!srcFS.getUri().equals(dstFS.getUri())) {
- throw new IOException("Renames across FileSystems not supported");
+ throw new IOException("Renames across AbstractFileSystems not supported");
}
srcFS.rename(absSrc, absDst, options);
}
@@ -750,10 +758,10 @@
*/
public FsStatus getFsStatus(final Path f) throws IOException {
if (f == null) {
- return defaultFS.getStatus(null);
+ return defaultFS.getFsStatus(null);
}
final Path absF = fixRelativePart(f);
- return getFSofPath(absF).getStatus(absF);
+ return getFSofPath(absF).getFsStatus(absF);
}
/**
@@ -827,15 +835,15 @@
if (!exists(f)) {
return false;
}
- synchronized (deleteOnExit) {
- if (deleteOnExit.isEmpty() && !finalizer.isAlive()) {
- Runtime.getRuntime().addShutdownHook(finalizer);
+ synchronized (DELETE_ON_EXIT) {
+ if (DELETE_ON_EXIT.isEmpty() && !FINALIZER.isAlive()) {
+ Runtime.getRuntime().addShutdownHook(FINALIZER);
}
- Set<Path> set = deleteOnExit.get(this);
+ Set<Path> set = DELETE_ON_EXIT.get(this);
if (set == null) {
set = new TreeSet<Path>();
- deleteOnExit.put(this, set);
+ DELETE_ON_EXIT.put(this, set);
}
set.add(f);
}
@@ -878,6 +886,31 @@
return results.toArray(new FileStatus[results.size()]);
}
+
+ /**
+ * Return the {@link ContentSummary} of path f.
+ * @param f
+ * @return the {@link ContentSummary} of path f.
+ * @throws IOException
+ */
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ FileStatus status = FileContext.this.getFileStatus(f);
+ if (!status.isDir()) {
+ // f is a file
+ return new ContentSummary(status.getLen(), 1, 0);
+ }
+ // f is a directory
+ long[] summary = {0, 0, 1};
+ for(FileStatus s : FileContext.this.listStatus(f)) {
+ ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) :
+ new ContentSummary(s.getLen(), 1, 0);
+ summary[0] += c.getLength();
+ summary[1] += c.getFileCount();
+ summary[2] += c.getDirectoryCount();
+ }
+ return new ContentSummary(summary[0], summary[1], summary[2]);
+ }
+
/**
* Filter files/directories in the given list of paths using default
* path filter.
@@ -1020,25 +1053,22 @@
* @return an array of FileStatus objects
* @throws IOException if any I/O error occurs when fetching file status
*/
- public FileStatus[] globStatus(final Path pathPattern, final PathFilter filter)
- throws IOException {
-
+ public FileStatus[] globStatus(final Path pathPattern,
+ final PathFilter filter) throws IOException {
+ URI uri = getFSofPath(fixRelativePart(pathPattern)).getUri();
+
String filename = pathPattern.toUri().getPath();
-
+
List<String> filePatterns = GlobExpander.expand(filename);
if (filePatterns.size() == 1) {
- Path p = fixRelativePart(pathPattern);
- FileSystem fs = getFSofPath(p);
- URI uri = fs.getUri();
- return globStatusInternal(uri, p, filter);
+ Path absPathPattern = fixRelativePart(pathPattern);
+ return globStatusInternal(uri, new Path(absPathPattern.toUri()
+ .getPath()), filter);
} else {
List<FileStatus> results = new ArrayList<FileStatus>();
- for (String filePattern : filePatterns) {
- Path p = new Path(filePattern);
- p = fixRelativePart(p);
- FileSystem fs = getFSofPath(p);
- URI uri = fs.getUri();
- FileStatus[] files = globStatusInternal(uri, p, filter);
+ for (String iFilePattern : filePatterns) {
+ Path iAbsFilePattern = fixRelativePart(new Path(iFilePattern));
+ FileStatus[] files = globStatusInternal(uri, iAbsFilePattern, filter);
for (FileStatus file : files) {
results.add(file);
}
@@ -1047,36 +1077,45 @@
}
}
+ /**
+ *
+ * @param uri for all the inPathPattern
+ * @param inPathPattern - without the scheme & authority (take from uri)
+ * @param filter
+ * @return
+ * @throws IOException
+ */
private FileStatus[] globStatusInternal(
final URI uri, final Path inPathPattern, final PathFilter filter)
throws IOException {
Path[] parents = new Path[1];
int level = 0;
- // comes in as full path, but just in case
- final Path pathPattern = fixRelativePart(inPathPattern);
+ assert(inPathPattern.toUri().getScheme() == null &&
+ inPathPattern.toUri().getAuthority() == null &&
+ inPathPattern.isUriPathAbsolute());
+
- String filename = pathPattern.toUri().getPath();
+ String filename = inPathPattern.toUri().getPath();
// path has only zero component
if ("".equals(filename) || Path.SEPARATOR.equals(filename)) {
- Path p = pathPattern.makeQualified(uri, null);
+ Path p = inPathPattern.makeQualified(uri, null);
return getFileStatus(new Path[]{p});
}
// path has at least one component
String[] components = filename.split(Path.SEPARATOR);
- // get the first component
- if (pathPattern.isAbsolute()) {
- parents[0] = new Path(Path.SEPARATOR);
- level = 1;
- } else {
- parents[0] = new Path(Path.CUR_DIR);
- }
+
+ // Path is absolute, first component is "/" hence first component
+ // is the uri root
+ parents[0] = new Path(new Path(uri), new Path("/"));
+ level = 1;
// glob the paths that match the parent path, ie. [0, components.length-1]
boolean[] hasGlob = new boolean[]{false};
- Path[] relParentPaths = globPathsLevel(parents, components, level, hasGlob);
+ Path[] relParentPaths =
+ globPathsLevel(parents, components, level, hasGlob);
FileStatus[] results;
if (relParentPaths == null || relParentPaths.length == 0) {
@@ -1212,12 +1251,6 @@
}
}
- private static final PathFilter DEFAULT_FILTER = new PathFilter() {
- public boolean accept(final Path file) {
- return true;
- }
- };
-
/* A class that could decide if a string matches the glob or not */
private static class GlobFilter implements PathFilter {
private PathFilter userFilter = DEFAULT_FILTER;
@@ -1401,7 +1434,7 @@
}
/**
- * Deletes all the paths in deleteOnExit on JVM shutdown
+ * Deletes all the paths in deleteOnExit on JVM shutdown.
*/
static class FileContextFinalizer extends Thread {
public synchronized void run() {
Modified: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java?rev=831475&r1=831474&r2=831475&view=diff
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java (original)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FileStatus.java Fri Oct 30 22:24:22 2009
@@ -148,6 +148,10 @@
public Path getPath() {
return path;
}
+
+ public void setPath(final Path p) {
+ path = p;
+ }
/* These are provided so that these values could be loaded lazily
* by a filesystem (e.g. local file system).
Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FilterFs.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,170 @@
+package org.apache.hadoop.fs;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * A <code>FilterFs</code> contains some other file system, which it uses as its
+ * basic file system, possibly transforming the data along the way or providing
+ * additional functionality. The class <code>FilterFs</code> itself simply
+ * overrides all methods of <code>AbstractFileSystem</code> with versions that
+ * pass all requests to the contained file system. Subclasses of
+ * <code>FilterFs</code> may further override some of these methods and may also
+ * provide additional methods and fields.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
+public abstract class FilterFs extends AbstractFileSystem {
+ private final AbstractFileSystem myFs;
+
+ protected AbstractFileSystem getMyFs() {
+ return myFs;
+ }
+
+ protected FilterFs(AbstractFileSystem fs) throws IOException,
+ URISyntaxException {
+ super(fs.getUri(), fs.getUri().getScheme(),
+ fs.getUri().getAuthority() != null, fs.getUriDefaultPort());
+ myFs = fs;
+ }
+
+ @Override
+ protected Path getInitialWorkingDirectory() {
+ return myFs.getInitialWorkingDirectory();
+ }
+
+ @Override
+ protected FSDataOutputStream createInternal(Path f,
+ EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize,
+ short replication, long blockSize, Progressable progress,
+ int bytesPerChecksum, boolean createParent) throws IOException {
+ checkPath(f);
+ return myFs.createInternal(f, flag, absolutePermission, bufferSize,
+ replication, blockSize, progress, bytesPerChecksum, createParent);
+ }
+
+ @Override
+ protected boolean delete(Path f, boolean recursive) throws IOException {
+ checkPath(f);
+ return myFs.delete(f, recursive);
+ }
+
+ @Override
+ protected BlockLocation[] getFileBlockLocations(Path f, long start, long len)
+ throws IOException {
+ checkPath(f);
+ return myFs.getFileBlockLocations(f, start, len);
+ }
+
+ @Override
+ protected FileChecksum getFileChecksum(Path f) throws IOException {
+ checkPath(f);
+ return myFs.getFileChecksum(f);
+ }
+
+ @Override
+ protected FileStatus getFileStatus(Path f) throws IOException {
+ checkPath(f);
+ return myFs.getFileStatus(f);
+ }
+
+ @Override
+ protected FsStatus getFsStatus() throws IOException {
+ return myFs.getFsStatus();
+ }
+
+ @Override
+ protected FsServerDefaults getServerDefaults() throws IOException {
+ return myFs.getServerDefaults();
+ }
+
+ @Override
+ protected int getUriDefaultPort() {
+ return myFs.getUriDefaultPort();
+ }
+
+ @Override
+ protected FileStatus[] listStatus(Path f) throws IOException {
+ checkPath(f);
+ return myFs.listStatus(f);
+ }
+
+ @Override
+ protected void mkdir(Path dir, FsPermission permission, boolean createParent)
+ throws IOException {
+ checkPath(dir);
+ myFs.mkdir(dir, permission, createParent);
+
+ }
+
+ @Override
+ protected FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ checkPath(f);
+ return myFs.open(f, bufferSize);
+ }
+
+ @Override
+ protected void renameInternal(Path src, Path dst) throws IOException {
+ checkPath(src);
+ checkPath(dst);
+ myFs.rename(src, dst, Options.Rename.NONE);
+
+ }
+
+ @Override
+ protected void setOwner(Path f, String username, String groupname)
+ throws IOException {
+ checkPath(f);
+ myFs.setOwner(f, username, groupname);
+
+ }
+
+ @Override
+ protected void setPermission(Path f, FsPermission permission)
+ throws IOException {
+ checkPath(f);
+ myFs.setPermission(f, permission);
+ }
+
+ @Override
+ protected boolean setReplication(Path f, short replication)
+ throws IOException {
+ checkPath(f);
+ return myFs.setReplication(f, replication);
+ }
+
+ @Override
+ protected void setTimes(Path f, long mtime, long atime) throws IOException {
+ checkPath(f);
+ myFs.setTimes(f, mtime, atime);
+
+ }
+
+ @Override
+ protected void setVerifyChecksum(boolean verifyChecksum) throws IOException {
+ myFs.setVerifyChecksum(verifyChecksum);
+ }
+}
Added: hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java?rev=831475&view=auto
==============================================================================
--- hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java (added)
+++ hadoop/common/trunk/src/java/org/apache/hadoop/fs/FsConstants.java Fri Oct 30 22:24:22 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.net.URI;
+
+/**
+ * FileSystem related constants.
+ */
+public interface FsConstants {
+ // URI for local filesystem
+ public static final URI LOCAL_FS_URI = URI.create("file:///");
+
+ // URI scheme for FTP
+ public static final String FTP_SCHEME = "ftp";
+}