You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2009/05/27 10:55:34 UTC
svn commit: r779066 - in /hadoop/core/trunk: ./
src/contrib/streaming/src/java/org/apache/hadoop/streaming/
src/core/org/apache/hadoop/filecache/
src/test/core/org/apache/hadoop/filecache/
Author: tomwhite
Date: Wed May 27 08:55:34 2009
New Revision: 779066
URL: http://svn.apache.org/viewvc?rev=779066&view=rev
Log:
HADOOP-5635. Change distributed cache to work with other distributed file systems. Contributed by Andrew Hitchcock.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 27 08:55:34 2009
@@ -689,6 +689,9 @@
CombineFileInputFormat is used as job InputFormat.
(Amareshwari Sriramadasu via dhruba)
+ HADOOP-5635. Change distributed cache to work with other distributed file
+ systems. (Andrew Hitchcock via tomwhite)
+
Release 0.20.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Wed May 27 08:55:34 2009
@@ -1131,7 +1131,7 @@
protected RunningJob running_;
protected JobID jobId_;
- protected static final String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
+ protected static final String LINK_URI = "You need to specify the uris as scheme://path#linkname," +
"Please specify a different link name for all of your caching URIs";
}
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java Wed May 27 08:55:34 2009
@@ -35,10 +35,10 @@
* </p>
*
* <p>Applications specify the files, via urls (hdfs:// or http://) to be cached
- * via the {@link org.apache.hadoop.mapred.JobConf}.
- * The <code>DistributedCache</code> assumes that the
- * files specified via hdfs:// urls are already present on the
- * {@link FileSystem} at the path specified by the url.</p>
+ * via the {@link org.apache.hadoop.mapred.JobConf}. The
+ * <code>DistributedCache</code> assumes that the files specified via urls are
+ * already present on the {@link FileSystem} at the path specified by the url
+ * and are accessible by every machine in the cluster.</p>
*
* <p>The framework will copy the necessary files on to the slave node before
* any tasks for the job are executed on that node. Its efficiency stems from
@@ -129,9 +129,7 @@
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
* @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
+ * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Confguration file which contains the filesystem
* @param baseDir The base cache Dir where you wnat to localize the files/archives
* @param fileStatus The file status on the dfs.
@@ -162,9 +160,7 @@
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
* @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
+ * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Confguration file which contains the filesystem
* @param baseDir The base cache Dir where you wnat to localize the files/archives
* @param fileStatus The file status on the dfs.
@@ -231,9 +227,7 @@
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
* @param cache the cache to be localized, this should be specified as
- * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema
- * or hostname:port is provided the file is assumed to be in the filesystem
- * being used in the Configuration
+ * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Confguration file which contains the filesystem
* @param baseDir The base cache Dir where you wnat to localize the files/archives
* @param isArchive if the cache is an archive or a file. In case it is an
@@ -350,7 +344,7 @@
if(cache.getFragment() == null) {
doSymlink = false;
}
- FileSystem fs = getFileSystem(cache, conf);
+ FileSystem fs = FileSystem.get(cache, conf);
String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
File flink = new File(link);
if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
@@ -533,14 +527,6 @@
}
}
- private static FileSystem getFileSystem(URI cache, Configuration conf)
- throws IOException {
- if ("hdfs".equals(cache.getScheme()))
- return FileSystem.get(cache, conf);
- else
- return FileSystem.get(conf);
- }
-
/**
* Set the configuration with the given set of archives
* @param archives The list of archives that need to be localized
Modified: hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java (original)
+++ hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java Wed May 27 08:55:34 2009
@@ -55,6 +55,15 @@
assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
dirStatuses.length > 1);
}
+
+ public void testFileSystemOtherThanDefault() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+ Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath());
+ Path result = DistributedCache.getLocalCache(fileToCache.toUri(), conf, new Path(TEST_CACHE_BASE_DIR),
+ false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+ assertNotNull("DistributedCache cached file on non-default filesystem.", result);
+ }
private void createTempFile(FileSystem fs, Path p) throws IOException {
FSDataOutputStream out = fs.create(p);