You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/13 00:00:33 UTC
svn commit: r486399 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/dfs/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/fs/
Author: cutting
Date: Tue Dec 12 15:00:31 2006
New Revision: 486399
URL: http://svn.apache.org/viewvc?view=rev&rev=486399
Log:
HADOOP-571. Extend the syntax of Path to be a URI.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Dec 12 15:00:31 2006
@@ -74,6 +74,15 @@
22. HADOOP-673. Give each task its own working directory again.
(Mahadev Konar via cutting)
+23. HADOOP-571. Extend the syntax of Path to be a URI; to be
+ optionally qualified with a scheme and authority. The scheme
+ determines the FileSystem implementation, while the authority
+ determines the FileSystem instance. New FileSystem
+ implementations may be provided by defining an fs.<scheme>.impl
+ property, naming the FileSystem implementation class. This
+ permits easy integration of new FileSystem implementations.
+ (cutting)
+
Release 0.9.1 - 2006-12-06
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Tue Dec 12 15:00:31 2006
@@ -98,9 +98,24 @@
<property>
<name>fs.default.name</name>
- <value>local</value>
- <description>The name of the default file system. Either the
- literal string "local" or a host:port for DFS.</description>
+ <value>file:///</value>
+ <description>The name of the default file system. A URI whose
+ scheme and authority determine the FileSystem implementation. The
+ uri's scheme determines the config property (fs.SCHEME.impl) naming
+ the FileSystem implementation class. The uri's authority is used to
+ determine the host, port, etc. for a filesystem.</description>
+</property>
+
+<property>
+ <name>fs.file.impl</name>
+ <value>org.apache.hadoop.fs.LocalFileSystem</value>
+ <description>The FileSystem for file: uris.</description>
+</property>
+
+<property>
+ <name>fs.hdfs.impl</name>
+ <value>org.apache.hadoop.dfs.DistributedFileSystem</value>
+ <description>The FileSystem for hdfs: uris.</description>
</property>
<property>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Tue Dec 12 15:00:31 2006
@@ -497,13 +497,15 @@
doc = builder.parse(url.toString());
}
} else if (name instanceof Path) { // a file resource
- Path file = (Path)name;
- FileSystem fs = FileSystem.getNamed("local", this);
- if (fs.exists(file)) {
+ // Can't use FileSystem API or we get an infinite loop
+ // since FileSystem uses Configuration API. Use java.io.File instead.
+ File file = new File(((Path)name).toUri().getPath())
+ .getAbsoluteFile();
+ if (file.exists()) {
if (!quiet) {
LOG.info("parsing " + file);
}
- InputStream in = new BufferedInputStream(fs.openRaw(file));
+ InputStream in = new BufferedInputStream(new FileInputStream(file));
try {
doc = builder.parse(in);
} finally {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Tue Dec 12 15:00:31 2006
@@ -165,11 +165,7 @@
return defaultBlockSize;
}
- public long getBlockSize(Path f) throws IOException {
- // if we already know the answer, use it.
- if (f instanceof DfsPath) {
- return ((DfsPath) f).getBlockSize();
- }
+ public long getBlockSize(UTF8 f) throws IOException {
int retries = 4;
while (true) {
try {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Tue Dec 12 15:00:31 2006
@@ -37,21 +37,35 @@
private Path workingDir =
new Path("/user", System.getProperty("user.name"));
- private String name;
+ private URI uri;
private FileSystem localFs;
DFSClient dfs;
- /** Construct a client for the filesystem at <code>namenode</code>.
- */
- public DistributedFileSystem(InetSocketAddress namenode, Configuration conf) throws IOException {
- super(conf);
- this.dfs = new DFSClient(namenode, conf);
- this.name = namenode.getHostName() + ":" + namenode.getPort();
- this.localFs = getNamed("local", conf);
+ public DistributedFileSystem() {}
+
+ /** @deprecated */
+ public DistributedFileSystem(InetSocketAddress namenode,
+ Configuration conf) throws IOException {
+ initialize(URI.create("hdfs://"+
+ namenode.getHostName()+":"+
+ namenode.getPort()),
+ conf);
}
- public String getName() { return name; }
+ /** @deprecated */
+ public String getName() { return uri.getAuthority(); }
+
+ public URI getUri() { return uri; }
+
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ setConf(conf);
+ String host = uri.getHost();
+ int port = uri.getPort();
+ this.dfs = new DFSClient(new InetSocketAddress(host,port), conf);
+ this.uri = URI.create("hdfs://"+host+":"+port);
+ this.localFs = getNamed("file:///", conf);
+ }
public Path getWorkingDirectory() {
return workingDir;
@@ -62,7 +76,11 @@
}
public long getBlockSize(Path f) throws IOException {
- return dfs.getBlockSize(makeAbsolute(f));
+ // if we already know the answer, use it.
+ if (f instanceof DfsPath) {
+ return ((DfsPath) f).getBlockSize();
+ }
+ return dfs.getBlockSize(getPath(f));
}
public short getDefaultReplication() {
@@ -82,7 +100,8 @@
}
private UTF8 getPath(Path file) {
- return new UTF8(makeAbsolute(file).toString());
+ checkPath(file);
+ return new UTF8(makeAbsolute(file).toUri().getPath());
}
public String[][] getFileCacheHints(Path f, long start, long len) throws IOException {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Tue Dec 12 15:00:31 2006
@@ -26,7 +26,7 @@
import org.apache.hadoop.dfs.*;
import org.apache.hadoop.conf.*;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.*;
/****************************************************************
* An abstract base class for a fairly generic filesystem. It
@@ -50,7 +50,9 @@
public abstract class FileSystem extends Configured {
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.DistributedFileSystem");
- private static final HashMap NAME_TO_FS = new HashMap();
+ // cache indexed by URI scheme and authority
+ private static final Map<String,Map<String,FileSystem>> CACHE
+ = new HashMap<String,Map<String,FileSystem>>();
/**
* Parse the cmd-line args, starting at i. Remove consumed args
* from array. We expect param in the form:
@@ -89,25 +91,100 @@
return getNamed(conf.get("fs.default.name", "local"), conf);
}
- /** Returns a name for this filesystem, suitable to pass to {@link
- * FileSystem#getNamed(String,Configuration)}.*/
+ /** Called after a new FileSystem instance is constructed.
+ * @param name a uri whose authority section names the host, port, etc.
+ * for this FileSystem
+ * @param conf the configuration
+ */
+ public abstract void initialize(URI name, Configuration conf)
+ throws IOException;
+
+ /** Returns a URI whose scheme and authority identify this FileSystem.*/
+ public abstract URI getUri();
+
+ /** @deprecated call #getUri() instead.*/
public abstract String getName();
+
+ /** @deprecated call #get(URI,Configuration) instead. */
+ public static FileSystem getNamed(String name, Configuration conf)
+ throws IOException {
+
+ // convert old-format name to new-format name
+ if (name.equals("local")) { // "local" is now "file:///".
+ name = "file:///";
+ } else if (name.indexOf('/')==-1) { // unqualified is "hdfs://"
+ name = "hdfs://"+name;
+ }
+
+ return get(URI.create(name), conf);
+ }
- /** Returns a named filesystem. Names are either the string "local" or a
- * host:port pair, naming an DFS name server.*/
- public static FileSystem getNamed(String name, Configuration conf) throws IOException {
- FileSystem fs = (FileSystem)NAME_TO_FS.get(name);
+ /** Returns the FileSystem for this URI's scheme and authority. The scheme
+ * of the URI determines a configuration property name,
+ * <tt>fs.<i>scheme</i>.class</tt> whose value names the FileSystem class.
+ * The entire URI is passed to the FileSystem instance's initialize method.
+ */
+ public static synchronized FileSystem get(URI uri, Configuration conf)
+ throws IOException {
+
+ String scheme = uri.getScheme();
+ String authority = uri.getAuthority();
+
+ if (scheme == null) { // no scheme: use default FS
+ return get(conf);
+ }
+
+ Map<String,FileSystem> authorityToFs = CACHE.get(scheme);
+ if (authorityToFs == null) {
+ authorityToFs = new HashMap<String,FileSystem>();
+ CACHE.put(scheme, authorityToFs);
+ }
+
+ FileSystem fs = authorityToFs.get(authority);
if (fs == null) {
- if ("local".equals(name)) {
- fs = new LocalFileSystem(conf);
- } else {
- fs = new DistributedFileSystem(DataNode.createSocketAddr(name), conf);
- }
- NAME_TO_FS.put(name, fs);
+ Class fsClass = conf.getClass("fs."+scheme+".impl", null);
+ if (fsClass == null) {
+ throw new IOException("No FileSystem for scheme: " + scheme);
+ }
+ fs = (FileSystem)ReflectionUtils.newInstance(fsClass, conf);
+ fs.initialize(uri, conf);
+ authorityToFs.put(authority, fs);
}
+
return fs;
}
+ /** Make sure that a path specifies a FileSystem. */
+ public Path makeQualified(Path path) {
+ checkPath(path);
+
+ if (!path.isAbsolute())
+ path = new Path(getWorkingDirectory(), path);
+
+ URI pathUri = path.toUri();
+ URI fsUri = getUri();
+
+ String scheme = pathUri.getScheme();
+ String authority = pathUri.getAuthority();
+
+ if (scheme != null &&
+ (authority != null || fsUri.getAuthority() == null))
+ return path;
+
+ if (scheme == null) {
+ scheme = fsUri.getScheme();
+ }
+
+ if (authority == null) {
+ authority = fsUri.getAuthority();
+ if (authority == null) {
+ authority = "";
+ }
+ }
+
+ return new Path(scheme+":"+"//"+authority + pathUri.getPath());
+ }
+
/** Return the name of the checksum file associated with a file.*/
public static Path getChecksumFile(Path file) {
return new Path(file.getParent(), "."+file.getName()+".crc");
@@ -123,10 +200,29 @@
// FileSystem
///////////////////////////////////////////////////////////////
+ /** @deprecated */
protected FileSystem(Configuration conf) {
super(conf);
}
+ protected FileSystem() {
+ super(null);
+ }
+
+ /** Check that a Path belongs to this FileSystem. */
+ protected void checkPath(Path path) {
+ URI uri = path.toUri();
+ if (uri.getScheme() == null) // fs is relative
+ return;
+ String thisAuthority = this.getUri().getAuthority();
+ String thatAuthority = uri.getAuthority();
+ if (!(this.getUri().getScheme().equals(uri.getScheme()) &&
+ (thisAuthority == null && thatAuthority == null)
+ || thisAuthority.equals(thatAuthority)))
+ throw new IllegalArgumentException("Wrong FS: "+path+
+ ", expected: "+this.getUri());
+ }
+
/**
* Return a 2D array of size 1x1 or greater, containing hostnames
* where portions of the given file can be found. For a nonexistent
@@ -766,7 +862,13 @@
* release any held locks.
*/
public void close() throws IOException {
- NAME_TO_FS.remove(getName());
+ URI uri = getUri();
+ synchronized (FileSystem.class) {
+ Map<String,FileSystem> authorityToFs = CACHE.get(uri.getScheme());
+ if (authorityToFs != null) {
+ authorityToFs.remove(uri.getAuthority());
+ }
+ }
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Tue Dec 12 15:00:31 2006
@@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.util.*;
import java.nio.channels.*;
+import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
@@ -31,6 +32,8 @@
* @author Mike Cafarella
*****************************************************************/
public class LocalFileSystem extends FileSystem {
+ private static final URI NAME = URI.create("file:///");
+
private Path workingDir =
new Path(System.getProperty("user.dir"));
TreeMap sharedLockDataSet = new TreeMap();
@@ -39,15 +42,11 @@
// by default use copy/delete instead of rename
boolean useCopyForRename = true;
- /** Construct a local filesystem client. */
+ public LocalFileSystem() {}
+
+ /** @deprecated. */
public LocalFileSystem(Configuration conf) throws IOException {
- super(conf);
- // if you find an OS which reliably supports non-POSIX
- // rename(2) across filesystems / volumes, you can
- // uncomment this.
- // String os = System.getProperty("os.name");
- // if (os.toLowerCase().indexOf("os-with-super-rename") != -1)
- // useCopyForRename = false;
+ initialize(NAME, conf);
}
/**
@@ -65,14 +64,22 @@
}
}
+ /** @deprecated */
public String getName() { return "local"; }
+ public URI getUri() { return NAME; }
+
+ public void initialize(URI uri, Configuration conf) {
+ setConf(conf);
+ }
+
/** Convert a path to a File. */
public File pathToFile(Path path) {
+ checkPath(path);
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
- return new File(path.toString());
+ return new File(path.toUri().getPath());
}
/*******************************************************
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/Path.java Tue Dec 12 15:00:31 2006
@@ -19,6 +19,10 @@
package org.apache.hadoop.fs;
import java.util.*;
+import java.net.*;
+import java.io.*;
+
+import org.apache.hadoop.conf.Configuration;
/** Names a file or directory in a {@link FileSystem}.
* Path strings use slash as the directory separator. A path string is
@@ -32,10 +36,7 @@
static final boolean WINDOWS
= System.getProperty("os.name").startsWith("Windows");
- private boolean isAbsolute; // if path starts with sepr
- private String[] elements; // tokenized path elements
- private String drive; // Windows drive letter
- private String asString; // cached toString() value
+ private URI uri; // a hierarchical uri
/** Resolve a child path against a parent path. */
public Path(String parent, String child) {
@@ -55,72 +56,142 @@
/** Resolve a child path against a parent path. */
public Path(Path parent, Path child) {
if (child.isAbsolute()) {
- this.isAbsolute = child.isAbsolute;
- this.elements = child.elements;
+ this.uri = child.uri;
} else {
- this.isAbsolute = parent.isAbsolute;
- ArrayList list = new ArrayList(parent.elements.length+child.elements.length);
- for (int i = 0; i < parent.elements.length; i++) {
- list.add(parent.elements[i]);
- }
- for (int i = 0; i < child.elements.length; i++) {
- list.add(child.elements[i]);
- }
- normalize(list);
- this.elements = (String[])list.toArray(new String[list.size()]);
+ // Add a slash to parent's path so resolution is compatible with URI's
+ URI parentUri = parent.uri;
+ String parentPath = parentUri.getPath();
+ if (!(parentPath.equals("/") || parentPath.equals("")))
+ try {
+ parentUri = new URI(parentUri.getScheme(), parentUri.getAuthority(),
+ parentUri.getPath()+"/", null, null);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ URI resolved = parentUri.resolve(child.uri);
+ initialize(resolved.getScheme(), resolved.getAuthority(),
+ normalizePath(resolved.getPath()));
}
- this.drive = child.drive == null ? parent.drive : child.drive;
}
- /** Construct a path from a String. */
+ /** Construct a path from a String. Path strings are URIs, but with
+ * unescaped elements and some additional normalization. */
public Path(String pathString) {
- if (WINDOWS) { // parse Windows path
- int colon = pathString.indexOf(':');
- if (colon == 1) { // parse Windows drive letter
- this.drive = pathString.substring(0, 1);
- pathString = pathString.substring(2);
- }
- pathString = pathString.replace('\\','/'); // convert backslash to slash
+ // We can't use 'new URI(String)' directly, since it assumes things are
+ // escaped, which we don't require of Paths.
+
+ // add a slash in front of paths with Windows drive letters
+ if (hasWindowsDrive(pathString, false))
+ pathString = "/"+pathString;
+
+ // parse uri components
+ String scheme = null;
+ String authority = null;
+
+ int start = 0;
+
+ // parse uri scheme, if any
+ int colon = pathString.indexOf(':');
+ int slash = pathString.indexOf('/');
+ if ((colon != -1) &&
+ ((slash == -1) || (colon < slash))) { // has a scheme
+ scheme = pathString.substring(0, colon);
+ start = colon+1;
+ }
+
+ // parse uri authority, if any
+ if (pathString.startsWith("//", start) &&
+ (pathString.length()-start > 2)) { // has authority
+ int nextSlash = pathString.indexOf('/', start+2);
+ int authEnd = nextSlash > 0 ? nextSlash : pathString.length();
+ authority = pathString.substring(start+2, authEnd);
+ start = authEnd;
}
- // determine whether the path is absolute
- this.isAbsolute = pathString.startsWith(SEPARATOR);
+ // uri path is the rest of the string -- query & fragment not supported
+ String path = pathString.substring(start, pathString.length());
+
+ initialize(scheme, authority, path);
+ }
+
+ /** Construct a Path from components. */
+ public Path(String scheme, String authority, String path) {
+ initialize(scheme, authority, path);
+ }
+
+ private void initialize(String scheme, String authority, String path) {
+ try {
+ this.uri = new URI(scheme, authority, normalizePath(path), null, null)
+ .normalize();
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ private String normalizePath(String path) {
+ // remove double slashes & backslashes
+ path = path.replace("//", "/");
+ path = path.replace("\\", "/");
+
+ // trim trailing slash from non-root path (ignoring windows drive)
+ int minLength = hasWindowsDrive(path, true) ? 4 : 1;
+ if (path.length() > minLength && path.endsWith("/")) {
+ path = path.substring(0, path.length()-1);
+ }
+
+ return path;
+ }
- // tokenize the path into elements
- Enumeration tokens = new StringTokenizer(pathString, SEPARATOR);
- ArrayList list = Collections.list(tokens);
- normalize(list);
- this.elements = (String[])list.toArray(new String[list.size()]);
+ private boolean hasWindowsDrive(String path, boolean slashed) {
+ if (!WINDOWS) return false;
+ int start = slashed ? 1 : 0;
+ return
+ path.length() >= start+2 &&
+ (slashed ? path.charAt(0) == '/' : true) &&
+ path.charAt(start+1) == ':' &&
+ ((path.charAt(start) >= 'A' && path.charAt(start) <= 'Z') ||
+ (path.charAt(start) >= 'a' && path.charAt(start) <= 'z'));
}
- private Path(boolean isAbsolute, String[] elements, String drive) {
- this.isAbsolute = isAbsolute;
- this.elements = elements;
- this.drive = drive;
+
+ /** Convert this to a URI. */
+ public URI toUri() { return uri; }
+
+ /** Return the FileSystem that owns this Path. */
+ public FileSystem getFileSystem(Configuration conf) throws IOException {
+ return FileSystem.get(this.toUri(), conf);
}
- /** True if this path is absolute. */
- public boolean isAbsolute() { return isAbsolute; }
+ /** True if the directory of this path is absolute. */
+ public boolean isAbsolute() {
+ int start = hasWindowsDrive(uri.getPath(), true) ? 3 : 0;
+ return uri.getPath().startsWith(SEPARATOR, start);
+ }
/** Returns the final component of this path.*/
public String getName() {
- if (elements.length == 0) {
- return "";
- } else {
- return elements[elements.length-1];
- }
+ String path = uri.getPath();
+ int slash = path.lastIndexOf(SEPARATOR);
+ return path.substring(slash+1);
}
- /** Returns the parent of a path. */
+ /** Returns the parent of a path or null if at root. */
public Path getParent() {
- if (elements.length == 0) {
+ String path = uri.getPath();
+ int lastSlash = path.lastIndexOf('/');
+ int start = hasWindowsDrive(path,true) ? 3 : 0;
+ if ((path.length() == start) || // empty path
+ (lastSlash == start && path.length() == start+1)) { // at root
return null;
}
- String[] newElements = new String[elements.length-1];
- for (int i = 0; i < newElements.length; i++) {
- newElements[i] = elements[i];
+ String parent;
+ if (lastSlash==-1) {
+ parent = "";
+ } else {
+ int end = hasWindowsDrive(path, true) ? 3 : 0;
+ parent = path.substring(0,lastSlash==end?end+1:lastSlash);
}
- return new Path(isAbsolute, newElements, drive);
+ return new Path(uri.getScheme(), uri.getAuthority(), parent);
}
/** Adds a suffix to the final name in the path.*/
@@ -129,27 +200,27 @@
}
public String toString() {
- if (asString == null) {
- StringBuffer buffer = new StringBuffer();
-
- if (drive != null) {
- buffer.append(drive);
- buffer.append(':');
- }
-
- if (elements.length == 0 && isAbsolute) {
- buffer.append(SEPARATOR);
- }
-
- for (int i = 0; i < elements.length; i++) {
- if (i !=0 || isAbsolute) {
- buffer.append(SEPARATOR);
- }
- buffer.append(elements[i]);
- }
- asString = buffer.toString();
+ // we can't use uri.toString(), which escapes everything, because we want
+ // illegal characters unescaped in the string, for glob processing, etc.
+ StringBuffer buffer = new StringBuffer();
+ if (uri.getScheme() != null) {
+ buffer.append(uri.getScheme());
+ buffer.append(":");
+ }
+ if (uri.getAuthority() != null) {
+ buffer.append("//");
+ buffer.append(uri.getAuthority());
+ }
+ if (uri.getPath() != null) {
+ String path = uri.getPath();
+ if (path.indexOf('/')==0 &&
+ hasWindowsDrive(path, true) && // has windows drive
+ uri.getScheme() == null && // but no scheme
+ uri.getAuthority() == null) // or authority
+ path = path.substring(1); // remove slash before drive
+ buffer.append(path);
}
- return asString;
+ return buffer.toString();
}
public boolean equals(Object o) {
@@ -157,83 +228,28 @@
return false;
}
Path that = (Path)o;
- return
- this.isAbsolute == that.isAbsolute &&
- Arrays.equals(this.elements, that.elements) &&
- (this.drive == null ? true : this.drive.equals(that.drive));
+ return this.uri.equals(that.uri);
}
public int hashCode() {
- int hashCode = isAbsolute ? 1 : -1;
- for (int i = 0; i < elements.length; i++) {
- hashCode ^= elements[i].hashCode();
- }
- if (drive != null) {
- hashCode ^= drive.hashCode();
- }
- return hashCode;
+ return uri.hashCode();
}
public int compareTo(Object o) {
Path that = (Path)o;
- return this.toString().compareTo(that.toString());
+ return this.uri.compareTo(that.uri);
}
/** Return the number of elements in this path. */
public int depth() {
- return elements.length;
- }
-
- /*
- * Removes '.' and '..'
- */
- private void normalize(ArrayList list) {
- boolean canNormalize = this.isAbsolute;
- boolean changed = false; // true if we have detected any . or ..
- int index = 0;
- int listSize = list.size();
- for (int i = 0; i < listSize; i++) {
- // Invariant: (index >= 0) && (index <= i)
- if (list.get(i).equals(".")) {
- changed = true;
- } else {
- if (canNormalize) {
- if (list.get(i).equals("..")) {
- if ((index > 0) && !list.get(index-1).equals("..")) {
- index--; // effectively deletes the last element currently in list.
- changed = true;
- } else { // index == 0
- // the first element is now going to be '..'
- canNormalize = false;
- list.set(index, "..");
- isAbsolute = false;
- index++;
- }
- } else { // list.get(i) != ".." or "."
- if (changed) {
- list.set(index, list.get(i));
- }
- index++;
- }
- } else { // !canNormalize
- if (changed) {
- list.set(index, list.get(i));
- }
- index++;
- if (!list.get(i).equals("..")) {
- canNormalize = true;
- }
- } // else !canNormalize
- }
- } // for
-
- // Remove the junk at the end of the list.
- for (int j = listSize-1; j >= index; j--) {
- list.remove(j);
+ String path = uri.getPath();
+ int depth = 0;
+ int slash = path.length()==1 && path.charAt(0)=='/' ? -1 : 0;
+ while (slash != -1) {
+ depth++;
+ slash = path.indexOf(SEPARATOR, slash+1);
}
-
+ return depth;
}
-
}
-
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormat.java Tue Dec 12 15:00:31 2006
@@ -43,22 +43,20 @@
/** Splits a set of input files. One split is created per map task.
*
- * @param fs the filesystem containing the files to be split
* @param job the job whose input files are to be split
* @param numSplits the desired number of splits
* @return the splits
*/
- FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits)
+ FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits)
throws IOException;
/** Construct a {@link RecordReader} for a {@link FileSplit}.
*
- * @param fs the {@link FileSystem}
* @param split the {@link FileSplit}
* @param job the job that this split belongs to
* @return a {@link RecordReader}
*/
- RecordReader getRecordReader(FileSystem fs, FileSplit split,
+ RecordReader getRecordReader(FileSystem ignored, FileSplit split,
JobConf job, Reporter reporter)
throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InputFormatBase.java Tue Dec 12 15:00:31 2006
@@ -71,12 +71,13 @@
* @return array of Path objects, never zero length.
* @throws IOException if zero items.
*/
- protected Path[] listPaths(FileSystem fs, JobConf job)
+ protected Path[] listPaths(FileSystem ignored, JobConf job)
throws IOException {
Path[] dirs = job.getInputPaths();
String subdir = job.get("mapred.input.subdir");
ArrayList result = new ArrayList();
for (int i = 0; i < dirs.length; i++) {
+ FileSystem fs = dirs[i].getFileSystem(job);
Path[] dir = fs.listPaths(dirs[i]);
if (dir != null) {
for (int j = 0; j < dir.length; j++) {
@@ -85,11 +86,11 @@
Path[] subFiles = fs.listPaths(new Path(file, subdir));
if (subFiles != null) {
for (int k = 0; k < subFiles.length; k++) {
- result.add(subFiles[k]);
+ result.add(fs.makeQualified(subFiles[k]));
}
}
} else {
- result.add(file);
+ result.add(fs.makeQualified(file));
}
}
}
@@ -101,26 +102,28 @@
return (Path[])result.toArray(new Path[result.size()]);
}
- public boolean[] areValidInputDirectories(FileSystem fileSys,
- Path[] inputDirs
- ) throws IOException {
+ // NOTE: should really pass a Configuration here, not a FileSystem
+ public boolean[] areValidInputDirectories(FileSystem fs, Path[] inputDirs)
+ throws IOException {
boolean[] result = new boolean[inputDirs.length];
for(int i=0; i < inputDirs.length; ++i) {
- result[i] = fileSys.isDirectory(inputDirs[i]);
+ result[i] =
+ inputDirs[i].getFileSystem(fs.getConf()).isDirectory(inputDirs[i]);
}
return result;
}
/** Splits files returned by {@link #listPaths(FileSystem,JobConf)} when
* they're too big.*/
- public FileSplit[] getSplits(FileSystem fs, JobConf job, int numSplits)
+ public FileSplit[] getSplits(FileSystem ignored, JobConf job, int numSplits)
throws IOException {
- Path[] files = listPaths(fs, job);
+ Path[] files = listPaths(ignored, job);
long totalSize = 0; // compute total size
for (int i = 0; i < files.length; i++) { // check we have valid files
Path file = files[i];
+ FileSystem fs = file.getFileSystem(job);
if (fs.isDirectory(file) || !fs.exists(file)) {
throw new IOException("Not a file: "+files[i]);
}
@@ -135,6 +138,7 @@
ArrayList splits = new ArrayList(numSplits); // generate splits
for (int i = 0; i < files.length; i++) {
Path file = files[i];
+ FileSystem fs = file.getFileSystem(job);
long length = fs.getLength(file);
if (isSplitable(fs, file)) {
long blockSize = fs.getBlockSize(file);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Tue Dec 12 15:00:31 2006
@@ -304,8 +304,15 @@
FileSystem userFileSys = FileSystem.get(job);
Path[] inputDirs = job.getInputPaths();
+
+ // make sure directories are fully qualified before checking them
+ for(int i=0; i < inputDirs.length; ++i) {
+ if (inputDirs[i].toUri().getScheme() == null) {
+ inputDirs[i] = userFileSys.makeQualified(inputDirs[i]);
+ }
+ }
+
// input paths should exist.
-
boolean[] validDirs =
job.getInputFormat().areValidInputDirectories(userFileSys, inputDirs);
for(int i=0; i < validDirs.length; ++i) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapFileOutputFormat.java Tue Dec 12 15:00:31 2006
@@ -34,7 +34,7 @@
/** An {@link OutputFormat} that writes {@link MapFile}s. */
public class MapFileOutputFormat extends OutputFormatBase {
- public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
@@ -42,7 +42,7 @@
// ignore the progress parameter, since MapFile is local
final MapFile.Writer out =
- new MapFile.Writer(job, fs, file.toString(),
+ new MapFile.Writer(job, file.getFileSystem(job), file.toString(),
job.getMapOutputKeyClass(),
job.getMapOutputValueClass(),
SequenceFile.getCompressionType(job),
@@ -61,11 +61,12 @@
}
/** Open the output generated by this format. */
- public static MapFile.Reader[] getReaders(FileSystem fs, Path dir,
+ public static MapFile.Reader[] getReaders(FileSystem ignored, Path dir,
Configuration conf)
throws IOException {
+ FileSystem fs = dir.getFileSystem(conf);
Path[] names = fs.listPaths(dir);
-
+
// sort names, so that hash partitioning works
Arrays.sort(names);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormat.java Tue Dec 12 15:00:31 2006
@@ -29,13 +29,12 @@
/** Construct a {@link RecordWriter} with Progressable.
*
- * @param fs the file system to write to
* @param job the job whose output is being written
* @param name the unique name for this part of the output
* @param progress mechanism for reporting progress while writing to file
* @return a {@link RecordWriter}
*/
- RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name,
+ RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
Progressable progress)
throws IOException;
@@ -47,6 +46,6 @@
* @param job the job whose output will be written
* @throws IOException when output should not be attempted
*/
- void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException;
+ void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/OutputFormatBase.java Tue Dec 12 15:00:31 2006
@@ -79,12 +79,12 @@
}
}
- public abstract RecordWriter getRecordWriter(FileSystem fs,
+ public abstract RecordWriter getRecordWriter(FileSystem ignored,
JobConf job, String name,
Progressable progress)
throws IOException;
- public void checkOutputSpecs(FileSystem fs, JobConf job)
+ public void checkOutputSpecs(FileSystem ignored, JobConf job)
throws FileAlreadyExistsException,
InvalidJobConfException, IOException {
// Ensure that the output directory is set and not already there
@@ -92,9 +92,9 @@
if (outDir == null && job.getNumReduceTasks() != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
- if (outDir != null && fs.exists(outDir)) {
+ if (outDir != null && outDir.getFileSystem(job).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
- " already exists in " + fs.getName() );
+ " already exists");
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/PhasedFileSystem.java Tue Dec 12 15:00:31 2006
@@ -3,6 +3,8 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.net.URI;
+import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -28,6 +30,8 @@
public class PhasedFileSystem extends FileSystem {
private FileSystem baseFS ;
+ private URI uri;
+
// Map from final file name to temporary file name
private Map<Path, FileInfo> finalNameToFileInfo = new HashMap();
@@ -110,6 +114,14 @@
return tempPath ;
}
+ public URI getUri() {
+ return baseFS.getUri();
+ }
+
+ public void initialize(URI uri, Configuration conf) throws IOException {
+ baseFS.initialize(uri, conf);
+ }
+
@Override
public FSOutputStream createRaw(
Path f, boolean overwrite, short replication, long blockSize)
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/SequenceFileOutputFormat.java Tue Dec 12 15:00:31 2006
@@ -36,11 +36,12 @@
/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
public class SequenceFileOutputFormat extends OutputFormatBase {
- public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress)
throws IOException {
Path file = new Path(job.getOutputPath(), name);
+ FileSystem fs = file.getFileSystem(job);
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
if (getCompressOutput(job)) {
@@ -75,7 +76,7 @@
/** Open the output generated by this format. */
public static SequenceFile.Reader[] getReaders(Configuration conf, Path dir)
throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = dir.getFileSystem(conf);
Path[] names = fs.listPaths(dir);
// sort names, so that hash partitioning works
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java Tue Dec 12 15:00:31 2006
@@ -53,10 +53,12 @@
}
}
- public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
- String name, Progressable progress) throws IOException {
+ public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
+ String name, Progressable progress)
+ throws IOException {
Path dir = job.getOutputPath();
+ FileSystem fs = dir.getFileSystem(job);
boolean isCompressed = getCompressOutput(job);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/CopyFiles.java Tue Dec 12 15:00:31 2006
@@ -132,8 +132,8 @@
*/
public static Path makeRelative(Path root, Path absPath) {
if (!absPath.isAbsolute()) { return absPath; }
- String sRoot = root.toString();
- String sPath = absPath.toString();
+ String sRoot = root.toUri().getPath();
+ String sPath = absPath.toUri().getPath();
Enumeration rootTokens = new StringTokenizer(sRoot, "/");
ArrayList rList = Collections.list(rootTokens);
Enumeration pathTokens = new StringTokenizer(sPath, "/");
@@ -815,7 +815,7 @@
try {
copy(conf, srcPath, destPath, srcAsList, ignoreReadFailures);
} catch (Exception e) {
- System.out.println("Caught: " + e);
+ System.err.println("Copy failed: "+StringUtils.stringifyException(e));
return -1;
}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java?view=diff&rev=486399&r1=486398&r2=486399
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/fs/TestPath.java Tue Dec 12 15:00:31 2006
@@ -90,7 +90,6 @@
assertEquals(new Path("foo/bar/baz"), new Path("foo/bar", "baz"));
assertEquals(new Path("/foo"), new Path("/bar", "/foo"));
if (Path.WINDOWS) {
- assertEquals(new Path("c:/foo"), new Path("c:/bar", "/foo"));
assertEquals(new Path("c:/foo"), new Path("/bar", "c:/foo"));
assertEquals(new Path("c:/foo"), new Path("d:/bar", "c:/foo"));
}
@@ -103,6 +102,7 @@
public void testDots() {
// Test Path(String)
assertEquals(new Path("/foo/bar/baz").toString(), "/foo/bar/baz");
+ assertEquals(new Path("/foo/bar", ".").toString(), "/foo/bar");
assertEquals(new Path("/foo/bar/../baz").toString(), "/foo/baz");
assertEquals(new Path("/foo/bar/./baz").toString(), "/foo/bar/baz");
assertEquals(new Path("/foo/bar/baz/../../fud").toString(), "/foo/fud");
@@ -111,7 +111,6 @@
assertEquals(new Path(".././../foo/bar").toString(), "../../foo/bar");
assertEquals(new Path("./foo/bar/baz").toString(), "foo/bar/baz");
assertEquals(new Path("/foo/bar/../../baz/boo").toString(), "/baz/boo");
- assertEquals(new Path("/foo/bar/../../../baz").toString(), "../baz");
assertEquals(new Path("foo/bar/").toString(), "foo/bar");
assertEquals(new Path("foo/bar/../baz").toString(), "foo/baz");
assertEquals(new Path("foo/bar/../../baz/boo").toString(), "baz/boo");
@@ -128,7 +127,6 @@
assertEquals(new Path("/foo/bar/baz","../../boo/bud").toString(), "/foo/boo/bud");
assertEquals(new Path("foo/bar/baz","../../boo/bud").toString(), "foo/boo/bud");
- assertEquals(new Path("/foo/bar/","../../../baz/boo").toString(), "../baz/boo");
assertEquals(new Path("../../","../../boo/bud").toString(), "../../../../boo/bud");
assertEquals(new Path("../../foo","../../../boo/bud").toString(), "../../../../boo/bud");