You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/08/17 18:45:45 UTC

svn commit: r805033 - in /hadoop/mapreduce/trunk: ./ lib/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/examples/org/apache/hadoop/examples/ src/examples/org/apache/hadoop/examples/terasort/ src/java/org/apache/hadoop/filecache/ src/j...

Author: yhemanth
Date: Mon Aug 17 16:45:44 2009
New Revision: 805033

URL: http://svn.apache.org/viewvc?rev=805033&view=rev
Log:
MAPREDUCE-711. Moved Distributed Cache from Common to Map/Reduce project. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/filecache/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/lib/hadoop-core-0.21.0-dev.jar
    hadoop/mapreduce/trunk/lib/hadoop-core-test-0.21.0-dev.jar
    hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Sort.java
    hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Aug 17 16:45:44 2009
@@ -20,6 +20,9 @@
     MAPREDUCE-817. Add a cache for retired jobs with minimal job info and 
     provide a way to access history file url. (sharad)
 
+    MAPREDUCE-711. Moved Distributed Cache from Common to Map/Reduce
+    project. (Vinod Kumar Vavilapalli via yhemanth)
+
   NEW FEATURES
 
     MAPREDUCE-706. Support for FIFO pools in the fair scheduler.

Modified: hadoop/mapreduce/trunk/lib/hadoop-core-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/lib/hadoop-core-0.21.0-dev.jar?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/mapreduce/trunk/lib/hadoop-core-test-0.21.0-dev.jar
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/lib/hadoop-core-test-0.21.0-dev.jar?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
Binary files - no diff available.

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Aug 17 16:45:44 2009
@@ -50,7 +50,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.FileInputFormat;

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Sort.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/Sort.java Mon Aug 17 16:45:44 2009
@@ -24,7 +24,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;

Modified: hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java (original)
+++ hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/terasort/TeraSort.java Mon Aug 17 16:45:44 2009
@@ -27,7 +27,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?rev=805033&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Mon Aug 17 16:45:44 2009
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.filecache;
+
+/**
+ * Class is moved to a new package org.apache.hadoop.mapreduce.filecache
+ * 
+ * @deprecated Instead use
+ *             {@link org.apache.hadoop.mapreduce.filecache.DistributedCache}
+ * 
+ */
+@Deprecated
+public class DistributedCache extends
+    org.apache.hadoop.mapreduce.filecache.DistributedCache {
+  //
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Aug 17 16:45:44 2009
@@ -47,7 +47,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Mon Aug 17 16:45:44 2009
@@ -32,7 +32,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Aug 17 16:45:44 2009
@@ -51,7 +51,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Mon Aug 17 16:45:44 2009
@@ -29,7 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.NullWritable;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/pipes/Submitter.java Mon Aug 17 16:45:44 2009
@@ -37,7 +37,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=805033&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Mon Aug 17 16:45:44 2009
@@ -0,0 +1,862 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.filecache;
+
+import org.apache.commons.logging.*;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.*;
+
+import java.net.URI;
+
+/**
+ * Distribute application-specific large, read-only files efficiently.
+ * 
+ * <p><code>DistributedCache</code> is a facility provided by the Map-Reduce
+ * framework to cache files (text, archives, jars etc.) needed by applications.
+ * </p>
+ * 
+ * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
+ * via the org.apache.hadoop.mapred.JobConf. The
+ * <code>DistributedCache</code> assumes that the files specified via urls are
+ * already present on the {@link FileSystem} at the path specified by the url
+ * and are accessible by every machine in the cluster.</p>
+ * 
+ * <p>The framework will copy the necessary files on to the slave node before 
+ * any tasks for the job are executed on that node. Its efficiency stems from 
+ * the fact that the files are only copied once per job and the ability to 
+ * cache archives which are un-archived on the slaves.</p> 
+ *
+ * <p><code>DistributedCache</code> can be used to distribute simple, read-only
+ * data/text files and/or more complex types such as archives, jars etc. 
+ * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. 
+ * Jars may be optionally added to the classpath of the tasks, a rudimentary 
+ * software distribution mechanism.  Files have execution permissions.
+ * Optionally users can also direct it to symlink the distributed cache file(s)
+ * into the working directory of the task.</p>
+ * 
+ * <p><code>DistributedCache</code> tracks modification timestamps of the cache 
+ * files. Clearly the cache files should not be modified by the application 
+ * or externally while the job is executing.</p>
+ * 
+ * <p>Here is an illustrative example on how to use the 
+ * <code>DistributedCache</code>:</p>
+ * <p><blockquote><pre>
+ *     // Setting up the cache for the application
+ *     
+ *     1. Copy the requisite files to the <code>FileSystem</code>:
+ *     
+ *     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat  
+ *     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip  
+ *     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
+ *     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
+ *     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
+ *     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
+ *     
+ *     2. Setup the application's <code>JobConf</code>:
+ *     
+ *     JobConf job = new JobConf();
+ *     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
+ *                                   job);
+ *     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
+ *     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
+ *     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
+ *     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
+ *     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
+ *     
+ *     3. Use the cached files in the org.apache.hadoop.mapred.Mapper
+ *     or org.apache.hadoop.mapred.Reducer:
+ *     
+ *     public static class MapClass extends MapReduceBase  
+ *     implements Mapper&lt;K, V, K, V&gt; {
+ *     
+ *       private Path[] localArchives;
+ *       private Path[] localFiles;
+ *       
+ *       public void configure(JobConf job) {
+ *         // Get the cached archives/files
+ *         localArchives = DistributedCache.getLocalCacheArchives(job);
+ *         localFiles = DistributedCache.getLocalCacheFiles(job);
+ *       }
+ *       
+ *       public void map(K key, V value, 
+ *                       OutputCollector&lt;K, V&gt; output, Reporter reporter) 
+ *       throws IOException {
+ *         // Use data from the cached archives/files here
+ *         // ...
+ *         // ...
+ *         output.collect(k, v);
+ *       }
+ *     }
+ *     
+ * </pre></blockquote></p>
+ * 
+ */
+public class DistributedCache {
+  // cacheID to cacheStatus mapping
+  private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
+  
+  private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
+  
+  // default total cache size
+  private static final long DEFAULT_CACHE_SIZE = 10737418240L;
+
+  private static final Log LOG =
+    LogFactory.getLog(DistributedCache.class);
+  
+  /**
+   * Get the locally cached file or archive; it could either be 
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   * 
+   * @param cache the cache to be localized, this should be specified as 
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
+   * @param conf The Confguration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param fileStatus The file status on the dfs.
+   * @param isArchive if the cache is an archive or a file. In case it is an
+   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+   *  be unzipped/unjarred/untarred automatically 
+   *  and the directory where the archive is unzipped/unjarred/untarred is
+   *  returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
+   * file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
+   * @return the path to directory where the archives are unjarred in case of archives,
+   * the path to the file where the file is copied locally 
+   * @throws IOException
+   */
+  public static Path getLocalCache(URI cache, Configuration conf, 
+                                   Path baseDir, FileStatus fileStatus,
+                                   boolean isArchive, long confFileStamp,
+                                   Path currentWorkDir) 
+  throws IOException {
+    return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
+        confFileStamp, currentWorkDir, true);
+  }
+  /**
+   * Get the locally cached file or archive; it could either be 
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   * 
+   * @param cache the cache to be localized, this should be specified as 
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
+   * @param conf The Confguration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param fileStatus The file status on the dfs.
+   * @param isArchive if the cache is an archive or a file. In case it is an
+   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+   *  be unzipped/unjarred/untarred automatically 
+   *  and the directory where the archive is unzipped/unjarred/untarred is
+   *  returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
+   * file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
+   * @param honorSymLinkConf if this is false, then the symlinks are not
+   * created even if conf says so (this is required for an optimization in task
+   * launches
+   * @return the path to directory where the archives are unjarred in case of archives,
+   * the path to the file where the file is copied locally 
+   * @throws IOException
+   */
+  public static Path getLocalCache(URI cache, Configuration conf, 
+      Path baseDir, FileStatus fileStatus,
+      boolean isArchive, long confFileStamp,
+      Path currentWorkDir, boolean honorSymLinkConf) 
+  throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    CacheStatus lcacheStatus;
+    Path localizedPath;
+    synchronized (cachedArchives) {
+      lcacheStatus = cachedArchives.get(cacheId);
+      if (lcacheStatus == null) {
+        // was never localized
+        lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
+        cachedArchives.put(cacheId, lcacheStatus);
+      }
+
+      synchronized (lcacheStatus) {
+        localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
+            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
+        lcacheStatus.refcount++;
+      }
+    }
+
+    // try deleting stuff if you can
+    long size = 0;
+    synchronized (baseDirSize) {
+      Long get = baseDirSize.get(baseDir);
+      if ( get != null ) {
+    	size = get.longValue();
+      }
+    }
+    // setting the cache size to a default of 10GB
+    long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
+    if (allowedSize < size) {
+      // try some cache deletions
+      deleteCache(conf);
+    }
+    return localizedPath;
+  }
+
+  
+  /**
+   * Get the locally cached file or archive; it could either be 
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   * 
+   * @param cache the cache to be localized, this should be specified as 
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
+   * @param conf The Confguration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param isArchive if the cache is an archive or a file. In case it is an 
+   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will 
+   *  be unzipped/unjarred/untarred automatically 
+   *  and the directory where the archive is unzipped/unjarred/untarred 
+   *  is returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
+   * file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
+   * @return the path to directory where the archives are unjarred in case of archives,
+   * the path to the file where the file is copied locally 
+   * @throws IOException
+
+   */
+  public static Path getLocalCache(URI cache, Configuration conf, 
+                                   Path baseDir, boolean isArchive,
+                                   long confFileStamp, Path currentWorkDir) 
+  throws IOException {
+    return getLocalCache(cache, conf, 
+                         baseDir, null, isArchive,
+                         confFileStamp, currentWorkDir);
+  }
+  
+  /**
+   * This is the opposite of getlocalcache. When you are done with
+   * using the cache, you need to release the cache
+   * @param cache The cache URI to be released
+   * @param conf configuration which contains the filesystem the cache 
+   * is contained in.
+   * @throws IOException
+   */
+  public static void releaseCache(URI cache, Configuration conf)
+    throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    synchronized (cachedArchives) {
+      CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+      if (lcacheStatus == null)
+        return;
+      synchronized (lcacheStatus) {
+        lcacheStatus.refcount--;
+      }
+    }
+  }
+  
+  // To delete the caches which have a refcount of zero
+  
+  private static void deleteCache(Configuration conf) throws IOException {
+    // try deleting cache Status with refcount of zero
+    synchronized (cachedArchives) {
+      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
+        String cacheId = (String) it.next();
+        CacheStatus lcacheStatus = cachedArchives.get(cacheId);
+        synchronized (lcacheStatus) {
+          if (lcacheStatus.refcount == 0) {
+            // delete this cache entry
+            FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
+            synchronized (baseDirSize) {
+              Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
+              if ( dirSize != null ) {
+            	dirSize -= lcacheStatus.size;
+            	baseDirSize.put(lcacheStatus.baseDir, dirSize);
+              }
+            }
+            it.remove();
+          }
+        }
+      }
+    }
+  }
+
+  /*
+   * Returns the relative path of the dir this cache will be localized in
+   * relative path that this cache will be localized in. For
+   * hdfs://hostname:port/absolute_path -- the relative path is
+   * hostname/absolute path -- if it is just /absolute_path -- then the
+   * relative path is hostname of DFS this mapred cluster is running
+   * on/absolute_path
+   */
+  public static String makeRelative(URI cache, Configuration conf)
+    throws IOException {
+    String host = cache.getHost();
+    if (host == null) {
+      host = cache.getScheme();
+    }
+    if (host == null) {
+      URI defaultUri = FileSystem.get(conf).getUri();
+      host = defaultUri.getHost();
+      if (host == null) {
+        host = defaultUri.getScheme();
+      }
+    }
+    String path = host + cache.getPath();
+    path = path.replace(":/","/");                // remove windows device colon
+    return path;
+  }
+
+  private static Path cacheFilePath(Path p) {
+    return new Path(p, p.getName());
+  }
+
+  // the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  private static Path localizeCache(Configuration conf, 
+                                    URI cache, long confFileStamp,
+                                    CacheStatus cacheStatus,
+                                    FileStatus fileStatus,
+                                    boolean isArchive, 
+                                    Path currentWorkDir,boolean honorSymLinkConf) 
+  throws IOException {
+    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
+    if(cache.getFragment() == null) {
+    	doSymlink = false;
+    }
+    FileSystem fs = FileSystem.get(cache, conf);
+    String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
+    File flink = new File(link);
+    if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
+                           cacheStatus, fileStatus)) {
+      if (isArchive) {
+        if (doSymlink){
+          if (!flink.exists())
+            FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
+                             link);
+        }
+        return cacheStatus.localLoadPath;
+      }
+      else {
+        if (doSymlink){
+          if (!flink.exists())
+            FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
+                             link);
+        }
+        return cacheFilePath(cacheStatus.localLoadPath);
+      }
+    } else {
+      // remove the old archive
+      // if the old archive cannot be removed since it is being used by another
+      // job
+      // return null
+      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+                              + " is in use and cannot be refreshed");
+      
+      FileSystem localFs = FileSystem.getLocal(conf);
+      localFs.delete(cacheStatus.localLoadPath, true);
+      synchronized (baseDirSize) {
+    	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+    	if ( dirSize != null ) {
+    	  dirSize -= cacheStatus.size;
+    	  baseDirSize.put(cacheStatus.baseDir, dirSize);
+    	}
+      }
+      Path parchive = new Path(cacheStatus.localLoadPath,
+                               new Path(cacheStatus.localLoadPath.getName()));
+      
+      if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
+        throw new IOException("Mkdirs failed to create directory " + 
+                              cacheStatus.localLoadPath.toString());
+      }
+
+      String cacheId = cache.getPath();
+      fs.copyToLocalFile(new Path(cacheId), parchive);
+      if (isArchive) {
+        String tmpArchive = parchive.toString().toLowerCase();
+        File srcFile = new File(parchive.toString());
+        File destDir = new File(parchive.getParent().toString());
+        if (tmpArchive.endsWith(".jar")) {
+          RunJar.unJar(srcFile, destDir);
+        } else if (tmpArchive.endsWith(".zip")) {
+          FileUtil.unZip(srcFile, destDir);
+        } else if (isTarFile(tmpArchive)) {
+          FileUtil.unTar(srcFile, destDir);
+        }
+        // else will not do anyhting
+        // and copy the file into the dir as it is
+      }
+      
+      long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
+      cacheStatus.size = cacheSize;
+      synchronized (baseDirSize) {
+      	Long dirSize = baseDirSize.get(cacheStatus.baseDir);
+      	if( dirSize == null ) {
+      	  dirSize = Long.valueOf(cacheSize);
+      	} else {
+      	  dirSize += cacheSize;
+      	}
+      	baseDirSize.put(cacheStatus.baseDir, dirSize);
+      }
+      
+      // do chmod here 
+      try {
+        //Setting recursive permission to grant everyone read and execute
+        FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
+      } catch(InterruptedException e) {
+    	LOG.warn("Exception in chmod" + e.toString());
+      }
+
+      // update cacheStatus to reflect the newly cached file
+      cacheStatus.currentStatus = true;
+      cacheStatus.mtime = getTimestamp(conf, cache);
+    }
+    
+    if (isArchive){
+      if (doSymlink){
+        if (!flink.exists())
+          FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
+                           link);
+      }
+      return cacheStatus.localLoadPath;
+    }
+    else {
+      if (doSymlink){
+        if (!flink.exists())
+          FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
+                           link);
+      }
+      return cacheFilePath(cacheStatus.localLoadPath);
+    }
+  }
+
+  private static boolean isTarFile(String filename) {
+    return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
+           filename.endsWith(".tar"));
+  }
+  
+  // Checks if the cache has already been localized and is fresh
+  private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
+                                          URI cache, long confFileStamp, 
+                                          CacheStatus lcacheStatus,
+                                          FileStatus fileStatus) 
+  throws IOException {
+    // check for existence of the cache
+    if (lcacheStatus.currentStatus == false) {
+      return false;
+    } else {
+      long dfsFileStamp;
+      if (fileStatus != null) {
+        dfsFileStamp = fileStatus.getModificationTime();
+      } else {
+        dfsFileStamp = getTimestamp(conf, cache);
+      }
+
+      // ensure that the file on hdfs hasn't been modified since the job started 
+      if (dfsFileStamp != confFileStamp) {
+        LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+        throw new IOException("File: " + cache + 
+                              " has changed on HDFS since job started");
+      }
+      
+      if (dfsFileStamp != lcacheStatus.mtime) {
+        // needs refreshing
+        return false;
+      }
+    }
+    
+    return true;
+  }
+
+  /**
+   * Returns mtime of a given cache file on hdfs.
+   * @param conf configuration
+   * @param cache cache file 
+   * @return mtime of a given cache file on hdfs
+   * @throws IOException
+   */
+  public static long getTimestamp(Configuration conf, URI cache)
+    throws IOException {
+    FileSystem fileSystem = FileSystem.get(cache, conf);
+    Path filePath = new Path(cache.getPath());
+
+    return fileSystem.getFileStatus(filePath).getModificationTime();
+  }
+  
+  /**
+   * This method create symlinks for all files in a given dir in another directory
+   * @param conf the configuration
+   * @param jobCacheDir the target directory for creating symlinks
+   * @param workDir the directory in which the symlinks are created
+   * @throws IOException
+   */
+  public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
+    throws IOException{
+    if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
+           workDir == null || (!workDir.isDirectory())) {
+      return;
+    }
+    boolean createSymlink = getSymlink(conf);
+    if (createSymlink){
+      File[] list = jobCacheDir.listFiles();
+      for (int i=0; i < list.length; i++){
+        FileUtil.symLink(list[i].getAbsolutePath(),
+                         new File(workDir, list[i].getName()).toString());
+      }
+    }  
+  }
+  
+  /**
+   * Set the configuration with the given set of archives
+   * @param archives The list of archives that need to be localized
+   * @param conf Configuration which will be changed
+   */
+  public static void setCacheArchives(URI[] archives, Configuration conf) {
+    String sarchives = StringUtils.uriToString(archives);
+    conf.set("mapred.cache.archives", sarchives);
+  }
+
+  /**
+   * Set the configuration with the given set of files
+   * @param files The list of files that need to be localized
+   * @param conf Configuration which will be changed
+   */
+  public static void setCacheFiles(URI[] files, Configuration conf) {
+    String sfiles = StringUtils.uriToString(files);
+    conf.set("mapred.cache.files", sfiles);
+  }
+
+  /**
+   * Get cache archives set in the Configuration
+   * @param conf The configuration which contains the archives
+   * @return A URI array of the caches set in the Configuration
+   * @throws IOException
+   */
+  public static URI[] getCacheArchives(Configuration conf) throws IOException {
+    return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
+  }
+
+  /**
+   * Get cache files set in the Configuration
+   * @param conf The configuration which contains the files
+   * @return A URI array of the files set in the Configuration
+   * @throws IOException
+   */
+
+  public static URI[] getCacheFiles(Configuration conf) throws IOException {
+    return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
+  }
+
+  /**
+   * Return the path array of the localized caches
+   * @param conf Configuration that contains the localized archives
+   * @return A path array of localized caches
+   * @throws IOException
+   */
+  public static Path[] getLocalCacheArchives(Configuration conf)
+    throws IOException {
+    return StringUtils.stringToPath(conf
+                                    .getStrings("mapred.cache.localArchives"));
+  }
+
+  /**
+   * Return the path array of the localized files
+   * @param conf Configuration that contains the localized files
+   * @return A path array of localized files
+   * @throws IOException
+   */
+  public static Path[] getLocalCacheFiles(Configuration conf)
+    throws IOException {
+    return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
+  }
+
+  /**
+   * Get the timestamps of the archives
+   * @param conf The configuration which stored the timestamps
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public static String[] getArchiveTimestamps(Configuration conf) {
+    return conf.getStrings("mapred.cache.archives.timestamps");
+  }
+
+
+  /**
+   * Get the timestamps of the files
+   * @param conf The configuration which stored the timestamps
+   * @return a string array of timestamps 
+   * @throws IOException
+   */
+  public static String[] getFileTimestamps(Configuration conf) {
+    return conf.getStrings("mapred.cache.files.timestamps");
+  }
+
+  /**
+   * This is to check the timestamp of the archives to be localized
+   * @param conf Configuration which stores the timestamp's
+   * @param timestamps comma separated list of timestamps of archives.
+   * The order should be the same as the order in which the archives are added.
+   */
+  public static void setArchiveTimestamps(Configuration conf, String timestamps) {
+    conf.set("mapred.cache.archives.timestamps", timestamps);
+  }
+
+  /**
+   * This is to check the timestamp of the files to be localized
+   * @param conf Configuration which stores the timestamp's
+   * @param timestamps comma separated list of timestamps of files.
+   * The order should be the same as the order in which the files are added.
+   */
+  public static void setFileTimestamps(Configuration conf, String timestamps) {
+    conf.set("mapred.cache.files.timestamps", timestamps);
+  }
+  
+  /**
+   * Set the conf to contain the location for localized archives 
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local archives
+   */
+  public static void setLocalArchives(Configuration conf, String str) {
+    conf.set("mapred.cache.localArchives", str);
+  }
+
+  /**
+   * Set the conf to contain the location for localized files 
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local files
+   */
+  public static void setLocalFiles(Configuration conf, String str) {
+    conf.set("mapred.cache.localFiles", str);
+  }
+
+  /**
+   * Add a archives to be localized to the conf
+   * @param uri The uri of the cache to be localized
+   * @param conf Configuration to add the cache to
+   */
+  public static void addCacheArchive(URI uri, Configuration conf) {
+    String archives = conf.get("mapred.cache.archives");
+    conf.set("mapred.cache.archives", archives == null ? uri.toString()
+             : archives + "," + uri.toString());
+  }
+  
+  /**
+   * Add a file to be localized to the conf
+   * @param uri The uri of the cache to be localized
+   * @param conf Configuration to add the cache to
+   */
+  public static void addCacheFile(URI uri, Configuration conf) {
+    String files = conf.get("mapred.cache.files");
+    conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
+             + uri.toString());
+  }
+
+  /**
+   * Add an file path to the current set of classpath entries It adds the file
+   * to cache as well.
+   * 
+   * @param file Path of the file to be added
+   * @param conf Configuration that contains the classpath setting
+   */
+  public static void addFileToClassPath(Path file, Configuration conf)
+    throws IOException {
+    String classpath = conf.get("mapred.job.classpath.files");
+    conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
+             : classpath + "," + file.toString());
+    FileSystem fs = FileSystem.get(conf);
+    URI uri = fs.makeQualified(file).toUri();
+
+    addCacheFile(uri, conf);
+  }
+
+  /**
+   * Get the file entries in classpath as an array of Path
+   * 
+   * @param conf Configuration that contains the classpath setting
+   */
+  public static Path[] getFileClassPaths(Configuration conf) {
+    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+                                "mapred.job.classpath.files");
+    if (list.size() == 0) { 
+      return null; 
+    }
+    Path[] paths = new Path[list.size()];
+    for (int i = 0; i < list.size(); i++) {
+      paths[i] = new Path(list.get(i));
+    }
+    return paths;
+  }
+
+  /**
+   * Add an archive path to the current set of classpath entries. It adds the
+   * archive to cache as well.
+   * 
+   * @param archive Path of the archive to be added
+   * @param conf Configuration that contains the classpath setting
+   */
+  public static void addArchiveToClassPath(Path archive, Configuration conf)
+    throws IOException {
+    String classpath = conf.get("mapred.job.classpath.archives");
+    conf.set("mapred.job.classpath.archives", classpath == null ? archive
+             .toString() : classpath + "," + archive.toString());
+    FileSystem fs = FileSystem.get(conf);
+    URI uri = fs.makeQualified(archive).toUri();
+
+    addCacheArchive(uri, conf);
+  }
+
+  /**
+   * Get the archive entries in classpath as an array of Path
+   * 
+   * @param conf Configuration that contains the classpath setting
+   */
+  public static Path[] getArchiveClassPaths(Configuration conf) {
+    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+                                "mapred.job.classpath.archives");
+    if (list.size() == 0) { 
+      return null; 
+    }
+    Path[] paths = new Path[list.size()];
+    for (int i = 0; i < list.size(); i++) {
+      paths[i] = new Path(list.get(i));
+    }
+    return paths;
+  }
+
+  /**
+   * This method allows you to create symlinks in the current working directory
+   * of the task to all the cache files/archives
+   * @param conf the jobconf 
+   */
+  public static void createSymlink(Configuration conf){
+    conf.set("mapred.create.symlink", "yes");
+  }
+  
+  /**
+   * This method checks to see if symlinks are to be create for the 
+   * localized cache files in the current working directory 
+   * @param conf the jobconf
+   * @return true if symlinks are to be created- else return false
+   */
+  public static boolean getSymlink(Configuration conf){
+    String result = conf.get("mapred.create.symlink");
+    if ("yes".equals(result)){
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * This method checks if there is a conflict in the fragment names 
+   * of the uris. Also makes sure that each uri has a fragment. It 
+   * is only to be called if you want to create symlinks for 
+   * the various archives and files.
+   * @param uriFiles The uri array of urifiles
+   * @param uriArchives the uri array of uri archives
+   */
+  public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
+    if ((uriFiles == null) && (uriArchives == null)){
+      return true;
+    }
+    if (uriFiles != null){
+      for (int i = 0; i < uriFiles.length; i++){
+        String frag1 = uriFiles[i].getFragment();
+        if (frag1 == null)
+          return false;
+        for (int j=i+1; j < uriFiles.length; j++){
+          String frag2 = uriFiles[j].getFragment();
+          if (frag2 == null)
+            return false;
+          if (frag1.equalsIgnoreCase(frag2))
+            return false;
+        }
+        if (uriArchives != null){
+          for (int j = 0; j < uriArchives.length; j++){
+            String frag2 = uriArchives[j].getFragment();
+            if (frag2 == null){
+              return false;
+            }
+            if (frag1.equalsIgnoreCase(frag2))
+              return false;
+            for (int k=j+1; k < uriArchives.length; k++){
+              String frag3 = uriArchives[k].getFragment();
+              if (frag3 == null)
+                return false;
+              if (frag2.equalsIgnoreCase(frag3))
+                return false;
+            }
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  private static class CacheStatus {
+    // false, not loaded yet, true is loaded
+    boolean currentStatus;
+
+    // the local load path of this cache
+    Path localLoadPath;
+    
+    //the base dir where the cache lies
+    Path baseDir;
+    
+    //the size of this cache
+    long size;
+
+    // number of instances using this cache
+    int refcount;
+
+    // the cache-file modification time
+    long mtime;
+
+    public CacheStatus(Path baseDir, Path localLoadPath) {
+      super();
+      this.currentStatus = false;
+      this.localLoadPath = localLoadPath;
+      this.refcount = 0;
+      this.mtime = -1;
+      this.baseDir = baseDir;
+      this.size = 0;
+    }
+  }
+
+  /**
+   * Clear the entire contents of the cache and delete the backing files. This
+   * should only be used when the server is reinitializing, because the users
+   * are going to lose their files.
+   */
+  public static void purgeCache(Configuration conf) throws IOException {
+    synchronized (cachedArchives) {
+      FileSystem localFs = FileSystem.getLocal(conf);
+      for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
+        try {
+          localFs.delete(f.getValue().localLoadPath, true);
+        } catch (IOException ie) {
+          LOG.debug("Error cleaning up cache", ie);
+        }
+      }
+      cachedArchives.clear();
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MRCaching.java Mon Aug 17 16:45:44 2009
@@ -33,7 +33,8 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.filecache.*;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+
 import java.net.URI;
 
 public class MRCaching {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=805033&r1=805032&r2=805033&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Mon Aug 17 16:45:44 2009
@@ -28,7 +28,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java?rev=805033&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestDistributedCache.java Mon Aug 17 16:45:44 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.filecache;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestDistributedCache extends TestCase {
+  
+  static final URI LOCAL_FS = URI.create("file:///");
+  private static String TEST_CACHE_BASE_DIR =
+    new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
+    .toString().replace(' ', '+');
+  private static String TEST_ROOT_DIR =
+    System.getProperty("test.build.data", "/tmp/distributedcache");
+  private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
+  private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
+  private Configuration conf;
+  private Path firstCacheFile;
+  private Path secondCacheFile;
+  private FileSystem localfs;
+  
+  /**
+   * @see TestCase#setUp()
+   */
+  @Override
+  protected void setUp() throws IOException {
+    conf = new Configuration();
+    conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
+    localfs = FileSystem.get(LOCAL_FS, conf);
+    firstCacheFile = new Path(TEST_ROOT_DIR+"/firstcachefile");
+    secondCacheFile = new Path(TEST_ROOT_DIR+"/secondcachefile");
+    createTempFile(localfs, firstCacheFile);
+    createTempFile(localfs, secondCacheFile);
+  }
+  
+  /** test delete cache */
+  public void testDeleteCache() throws Exception {
+    DistributedCache.getLocalCache(firstCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    DistributedCache.releaseCache(firstCacheFile.toUri(), conf);
+    //in above code,localized a file of size 4K and then release the cache which will cause the cache 
+    //be deleted when the limit goes out. The below code localize another cache which's designed to 
+    //sweep away the first cache.
+    DistributedCache.getLocalCache(secondCacheFile.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    FileStatus[] dirStatuses = localfs.listStatus(new Path(TEST_CACHE_BASE_DIR));
+    assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
+        dirStatuses.length > 1);
+  }
+  
+  public void testFileSystemOtherThanDefault() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath());
+    Path result = DistributedCache.getLocalCache(fileToCache.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    assertNotNull("DistributedCache cached file on non-default filesystem.", result);
+  }
+
+  private void createTempFile(FileSystem fs, Path p) throws IOException {
+    FSDataOutputStream out = fs.create(p);
+    byte[] toWrite = new byte[TEST_FILE_SIZE];
+    new Random().nextBytes(toWrite);
+    out.write(toWrite);
+    out.close();
+    FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE);
+  }
+  
+  /**
+   * @see TestCase#tearDown()
+   */
+  @Override
+  protected void tearDown() throws IOException {
+    localfs.delete(firstCacheFile, true);
+    localfs.delete(secondCacheFile, true);
+    localfs.close();
+  }
+}