You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2009/05/27 10:55:34 UTC

svn commit: r779066 - in /hadoop/core/trunk: ./ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/core/org/apache/hadoop/filecache/ src/test/core/org/apache/hadoop/filecache/

Author: tomwhite
Date: Wed May 27 08:55:34 2009
New Revision: 779066

URL: http://svn.apache.org/viewvc?rev=779066&view=rev
Log:
HADOOP-5635. Change distributed cache to work with other distributed file systems. Contributed by Andrew Hitchcock.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed May 27 08:55:34 2009
@@ -689,6 +689,9 @@
     CombineFileInputFormat is used as job InputFormat.
     (Amareshwari Sriramadasu via dhruba)
 
+    HADOOP-5635. Change distributed cache to work with other distributed file
+    systems. (Andrew Hitchcock via tomwhite)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Wed May 27 08:55:34 2009
@@ -1131,7 +1131,7 @@
 
   protected RunningJob running_;
   protected JobID jobId_;
-  protected static final String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
+  protected static final String LINK_URI = "You need to specify the uris as scheme://path#linkname," +
     "Please specify a different link name for all of your caching URIs";
 
 }

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java Wed May 27 08:55:34 2009
@@ -35,10 +35,10 @@
  * </p>
  * 
  * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
- * via the {@link org.apache.hadoop.mapred.JobConf}.
- * The <code>DistributedCache</code> assumes that the
- * files specified via hdfs:// urls are already present on the 
- * {@link FileSystem} at the path specified by the url.</p>
+ * via the {@link org.apache.hadoop.mapred.JobConf}. The
+ * <code>DistributedCache</code> assumes that the files specified via urls are
+ * already present on the {@link FileSystem} at the path specified by the url
+ * and are accessible by every machine in the cluster.</p>
  * 
  * <p>The framework will copy the necessary files on to the slave node before 
  * any tasks for the job are executed on that node. Its efficiency stems from 
@@ -129,9 +129,7 @@
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param fileStatus The file status on the dfs.
@@ -162,9 +160,7 @@
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param fileStatus The file status on the dfs.
@@ -231,9 +227,7 @@
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param isArchive if the cache is an archive or a file. In case it is an 
@@ -350,7 +344,7 @@
     if(cache.getFragment() == null) {
     	doSymlink = false;
     }
-    FileSystem fs = getFileSystem(cache, conf);
+    FileSystem fs = FileSystem.get(cache, conf);
     String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
     if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
@@ -533,14 +527,6 @@
     }  
   }
   
-  private static FileSystem getFileSystem(URI cache, Configuration conf)
-    throws IOException {
-    if ("hdfs".equals(cache.getScheme()))
-      return FileSystem.get(cache, conf);
-    else
-      return FileSystem.get(conf);
-  }
-
   /**
    * Set the configuration with the given set of archives
    * @param archives The list of archives that need to be localized

Modified: hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java?rev=779066&r1=779065&r2=779066&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java (original)
+++ hadoop/core/trunk/src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java Wed May 27 08:55:34 2009
@@ -55,6 +55,15 @@
     assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
         dirStatuses.length > 1);
   }
+  
+  public void testFileSystemOtherThanDefault() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath());
+    Path result = DistributedCache.getLocalCache(fileToCache.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    assertNotNull("DistributedCache cached file on non-default filesystem.", result);
+  }
 
   private void createTempFile(FileSystem fs, Path p) throws IOException {
     FSDataOutputStream out = fs.create(p);