You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/09/15 00:13:44 UTC

svn commit: r443497 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/filecache/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/mapred/

Author: cutting
Date: Thu Sep 14 15:13:43 2006
New Revision: 443497

URL: http://svn.apache.org/viewvc?view=rev&rev=443497
Log:
HADOOP-288.  Add a file caching system and use it in MapReduce to cache job jar files on slave nodes.  Contributed by Mahadev.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/
    lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar   (with props)
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip   (with props)
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/build.xml
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 14 15:13:43 2006
@@ -14,6 +14,9 @@
 3. HADOOP-530.  Improve error messages in SequenceFile when keys or
    values are of the wrong type.  (Hairong Kuang via cutting)
 
+4. HADOOP-288.  Add a file caching system and use it in MapReduce to
+   cache job jar files on slave nodes.  (Mahadev Konar via cutting)
+
 
 Release 0.6.1 - 2006-08-13
 

Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Thu Sep 14 15:13:43 2006
@@ -36,6 +36,7 @@
   <property name="test.src.dir" value="${basedir}/src/test"/>
   <property name="test.build.dir" value="${build.dir}/test"/>
   <property name="test.build.data" value="${test.build.dir}/data"/>
+  <property name="test.cache.data" value="${test.build.dir}/cache"/>
   <property name="hadoop.log.dir" value="${test.build.dir}/logs"/>
   <property name="test.build.classes" value="${test.build.dir}/classes"/>
   <property name="test.build.javadoc" value="${test.build.dir}/docs/api"/>
@@ -277,6 +278,11 @@
                       value="org/apache/hadoop/test/AllTestDriver"/>
          </manifest>
     </jar>
+    <delete dir="${test.cache.data}"/>
+    <mkdir dir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.txt" todir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.jar" todir="${test.cache.data}"/>
+    <copy file="${test.src.dir}/org/apache/hadoop/mapred/test.zip" todir="${test.cache.data}"/>    
   </target>
 
   <!-- ================================================================== -->
@@ -288,7 +294,6 @@
     <mkdir dir="${test.build.data}"/>
     <delete dir="${hadoop.log.dir}"/>
     <mkdir dir="${hadoop.log.dir}"/>
-
     <junit printsummary="yes" haltonfailure="no" fork="yes" dir="${basedir}"
       errorProperty="tests.failed" failureProperty="tests.failed">
       <sysproperty key="test.build.data" value="${test.build.data}"/>

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Sep 14 15:13:43 2006
@@ -271,6 +271,14 @@
 </property>
 
 <property>
+  <name>local.cache.size</name>
+  <value>10737418240</value>
+  <description>The limit on the size of cache you want to keep, set by default
+  to 10GB. This will act as a soft limit on the cache directory for out of band data.
+  </description>
+</property>
+            
+<property>
   <name>mapred.system.dir</name>
   <value>${hadoop.tmp.dir}/mapred/system</value>
   <description>The shared directory where MapReduce stores control files.

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java?view=auto&rev=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/filecache/DistributedCache.java Thu Sep 14 15:13:43 2006
@@ -0,0 +1,466 @@
+/* Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.filecache;
+
+import org.apache.commons.logging.*;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.fs.*;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.net.URI;
+
+/*******************************************************************************
+ * The DistributedCache maintains all the caching information of cached archives
+ * and unarchives all the files as well and returns the path
+ * 
+ * @author Mahadev Konar
+ ******************************************************************************/
+public class DistributedCache {
+  // cacheID to cacheStatus mapping
+  private static TreeMap cachedArchives = new TreeMap();
+  // buffer size for reading checksum files
+  private static final int CRC_BUFFER_SIZE = 64 * 1024;
+  
+  /**
+   * 
+   * @param cache the cache to be localized, this should be specified as 
+   * new URI(dfs://hostname:port/absoulte_path_to_file). If no schema 
+   * or hostname:port is provided the file is assumed to be in the filesystem
+   * being used in the Configuration
+   * @param conf The Confguration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param isArchive if the cache is an archive or a file. In case it is an archive
+   *  with a .zip or .jar extension it will be unzipped/unjarred automatically 
+   *  and the directory where the archive is unjarred is returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param md5 this is a mere checksum to verufy if you are using the right cache. 
+   * You need to pass the md5 of the crc file in DFS. This is matched against the one
+   * calculated in this api and if it does not match, the cache is not localized.
+   * @return the path to directory where the archives are unjarred in case of archives,
+   * the path to the file where the file is copied locally 
+   * @throws IOException
+   */
+  public static Path getLocalCache(URI cache, Configuration conf, Path baseDir,
+      boolean isArchive, String md5) throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    CacheStatus lcacheStatus;
+    Path localizedPath;
+    synchronized (cachedArchives) {
+      if (!cachedArchives.containsKey(cacheId)) {
+        // was never localized
+        lcacheStatus = new CacheStatus();
+        lcacheStatus.currentStatus = false;
+        lcacheStatus.refcount = 1;
+        lcacheStatus.localLoadPath = new Path(baseDir, new Path(cacheId));
+        cachedArchives.put(cacheId, lcacheStatus);
+      } else {
+        lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+        synchronized (lcacheStatus) {
+          lcacheStatus.refcount++;
+        }
+      }
+    }
+    synchronized (lcacheStatus) {
+      localizedPath = localizeCache(cache, lcacheStatus, conf, isArchive, md5);
+    }
+    // try deleting stuff if you can
+    long size = FileUtil.getDU(new File(baseDir.toString()));
+    // setting the cache size to a default of 1MB
+    long allowedSize = conf.getLong("local.cache.size", 1048576L);
+    if (allowedSize < size) {
+      // try some cache deletions
+      deleteCache(conf);
+    }
+    return localizedPath;
+  }
+  
+  /**
+   * This is the opposite of getlocalcache. When you are done with
+   * using the cache, you need to release the cache
+   * @param cache The cache URI to be released
+   * @param conf configuration which contains the filesystem the cache 
+   * is contained in.
+   * @throws IOException
+   */
+  public static void releaseCache(URI cache, Configuration conf)
+      throws IOException {
+    String cacheId = makeRelative(cache, conf);
+    synchronized (cachedArchives) {
+      CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+      synchronized (lcacheStatus) {
+        lcacheStatus.refcount--;
+      }
+    }
+  }
+  
+  // To delete the caches which have a refcount of zero
+  
+  private static void deleteCache(Configuration conf) throws IOException {
+    // try deleting cache Status with refcount of zero
+    synchronized (cachedArchives) {
+      for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
+        String cacheId = (String) it.next();
+        CacheStatus lcacheStatus = (CacheStatus) cachedArchives.get(cacheId);
+        if (lcacheStatus.refcount == 0) {
+          // delete this cache entry
+          FileSystem.getNamed("local", conf).delete(lcacheStatus.localLoadPath);
+          cachedArchives.remove(cacheId);
+        }
+      }
+    }
+  }
+
+  /*
+   * Returns the relative path of the dir this cache will be localized in
+   * relative path that this cache will be localized in. For
+   * dfs://hostname:port/absolute_path -- the relative path is
+   * hostname/absolute path -- if it is just /absolute_path -- then the
+   * relative path is hostname of DFS this mapred cluster is running
+   * on/absolute_path
+   */
+  private static String makeRelative(URI cache, Configuration conf)
+      throws IOException {
+    String fsname = cache.getScheme();
+    String path;
+    FileSystem dfs = FileSystem.get(conf);
+    if ("dfs".equals(fsname)) {
+      path = cache.getHost() + cache.getPath();
+    } else {
+      String[] split = dfs.getName().split(":");
+      path = split[0] + cache.getPath();
+    }
+    return path;
+  }
+
+  private static Path cacheFilePath(Path p) {
+    return new Path(p, p.getName());
+  }
+
+  // the methoed which actually copies the caches locally and unjars/unzips them
+  private static Path localizeCache(URI cache, CacheStatus cacheStatus,
+      Configuration conf, boolean isArchive, String md5) throws IOException {
+    boolean b = true;
+    FileSystem dfs = getFileSystem(cache, conf);
+    b = ifExistsAndFresh(cacheStatus, cache, dfs, md5, conf);
+    if (b) {
+      if (isArchive)
+        return cacheStatus.localLoadPath;
+      else
+        return cacheFilePath(cacheStatus.localLoadPath);
+    } else {
+      // remove the old archive
+      // if the old archive cannot be removed since it is being used by another
+      // job
+      // return null
+      if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
+        throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+            + " is in use and cannot be refreshed");
+      byte[] checkSum = createMD5(cache, conf);
+      FileSystem localFs = FileSystem.getNamed("local", conf);
+      localFs.delete(cacheStatus.localLoadPath);
+      Path parchive = new Path(cacheStatus.localLoadPath,
+                               new Path(cacheStatus.localLoadPath.getName()));
+
+      localFs.mkdirs(cacheStatus.localLoadPath);
+      String cacheId = cache.getPath();
+      dfs.copyToLocalFile(new Path(cacheId), parchive);
+      dfs.copyToLocalFile(new Path(cacheId + "_md5"), new Path(parchive
+          .toString()
+          + "_md5"));
+      if (isArchive) {
+        String tmpArchive = parchive.toString().toLowerCase();
+        if (tmpArchive.endsWith(".jar")) {
+          RunJar.unJar(new File(parchive.toString()), new File(parchive
+              .getParent().toString()));
+        } else if (tmpArchive.endsWith(".zip")) {
+          FileUtil.unZip(new File(parchive.toString()), new File(parchive
+              .getParent().toString()));
+
+        }
+        // else will not do anyhting
+        // and copy the file into the dir as it is
+      }
+      cacheStatus.currentStatus = true;
+      cacheStatus.md5 = checkSum;
+    }
+    if (isArchive)
+      return cacheStatus.localLoadPath;
+    else
+      return cacheFilePath(cacheStatus.localLoadPath);
+
+  }
+
+  // Checks if the cache has already been localized and is fresh
+  private static boolean ifExistsAndFresh(CacheStatus lcacheStatus, URI cache,
+      FileSystem dfs, String confMD5, Configuration conf) throws IOException {
+    // compute the md5 of the crc
+    byte[] digest = null;
+    byte[] fsDigest = createMD5(cache, conf);
+    byte[] confDigest = StringUtils.hexStringToByte(confMD5);
+    // check for existence of the cache
+    if (lcacheStatus.currentStatus == false) {
+      return false;
+    } else {
+      digest = lcacheStatus.md5;
+      if (!MessageDigest.isEqual(confDigest, fsDigest)) {
+        throw new IOException("Inconsistencty in data caching, "
+            + "Cache archives have been changed");
+      } else {
+        if (!MessageDigest.isEqual(confDigest, digest)) {
+          // needs refreshing
+          return false;
+        } else {
+          // does not need any refreshing
+          return true;
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns md5 of the checksum file for a given dfs file.
+   * This method also creates file filename_md5 existence of which
+   * signifies a new cache has been loaded into dfs. So if you want to
+   * refresh the cache, you need to delete this md5 file as well.
+   * @param cache The cache to get the md5 checksum for
+   * @param conf configuration
+   * @return md5 of the crc of the cache parameter
+   * @throws IOException
+   */
+  public static byte[] createMD5(URI cache, Configuration conf)
+      throws IOException {
+    byte[] b = new byte[CRC_BUFFER_SIZE];
+    byte[] digest = null;
+
+    FileSystem fileSystem = getFileSystem(cache, conf);
+    String filename = cache.getPath();
+    Path filePath = new Path(filename);
+    Path md5File = new Path(filePath.getParent().toString() + Path.SEPARATOR
+        + filePath.getName() + "_md5");
+    MessageDigest md5 = null;
+    try {
+      md5 = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException na) {
+      // do nothing
+    }
+    if (!fileSystem.exists(md5File)) {
+      FSInputStream fsStream = fileSystem.openRaw(FileSystem
+          .getChecksumFile(filePath));
+      int read = fsStream.read(b);
+      while (read != -1) {
+        md5.update(b, 0, read);
+        read = fsStream.read(b);
+      }
+      fsStream.close();
+      digest = md5.digest();
+
+      FSDataOutputStream out = fileSystem.create(md5File);
+      out.write(digest);
+      out.close();
+    } else {
+      FSInputStream fsStream = fileSystem.openRaw(md5File);
+      digest = new byte[md5.getDigestLength()];
+      // assuming reading 16 bytes once is not a problem
+      // though it should be checked if 16 bytes have been read or not
+      int read = fsStream.read(digest);
+      fsStream.close();
+    }
+
+    return digest;
+  }
+
+  private static String getFileSysName(URI url) {
+    String fsname = url.getScheme();
+    if ("dfs".equals(fsname)) {
+      String host = url.getHost();
+      int port = url.getPort();
+      return (port == (-1)) ? host : (host + ":" + port);
+    } else {
+      return null;
+    }
+  }
+
+  private static FileSystem getFileSystem(URI cache, Configuration conf)
+      throws IOException {
+    String fileSysName = getFileSysName(cache);
+    if (fileSysName != null)
+      return FileSystem.getNamed(fileSysName, conf);
+    else
+      return FileSystem.get(conf);
+  }
+
+  /**
+   * Set the configuration with the given set of archives
+   * @param archives The list of archives that need to be localized
+   * @param conf Configuration which will be changed
+   */
+  public static void setCacheArchives(URI[] archives, Configuration conf) {
+    String sarchives = StringUtils.uriToString(archives);
+    conf.set("mapred.cache.archives", sarchives);
+  }
+
+  /**
+   * Set the configuration with the given set of files
+   * @param files The list of files that need to be localized
+   * @param conf Configuration which will be changed
+   */
+  public static void setCacheFiles(URI[] files, Configuration conf) {
+    String sfiles = StringUtils.uriToString(files);
+    conf.set("mapred.cache.files", sfiles);
+  }
+
+  /**
+   * Get cache archives set in the Configuration
+   * @param conf The configuration which contains the archives
+   * @return A URI array of the caches set in the Configuration
+   * @throws IOException
+   */
+  public static URI[] getCacheArchives(Configuration conf) throws IOException {
+    return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
+  }
+
+  /**
+   * Get cache files set in the Configuration
+   * @param conf The configuration which contains the files
+   * @return A URI array of the files set in the Configuration
+   * @throws IOException
+   */
+
+  public static URI[] getCacheFiles(Configuration conf) throws IOException {
+    return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
+  }
+
+  /**
+   * Return the path array of the localized caches
+   * @param conf Configuration that contains the localized archives
+   * @return A path array of localized caches
+   * @throws IOException
+   */
+  public static Path[] getLocalCacheArchives(Configuration conf)
+      throws IOException {
+    return StringUtils.stringToPath(conf
+        .getStrings("mapred.cache.localArchives"));
+  }
+
+  /**
+   * Return the path array of the localized files
+   * @param conf Configuration that contains the localized files
+   * @return A path array of localized files
+   * @throws IOException
+   */
+  public static Path[] getLocalCacheFiles(Configuration conf)
+      throws IOException {
+    return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
+  }
+
+  /**
+   * Get the md5 checksums of the archives
+   * @param conf The configuration which stored the md5's
+   * @return a string array of md5 checksums 
+   * @throws IOException
+   */
+  public static String[] getArchiveMd5(Configuration conf) throws IOException {
+    return conf.getStrings("mapred.cache.archivemd5");
+  }
+
+
+  /**
+   * Get the md5 checksums of the files
+   * @param conf The configuration which stored the md5's
+   * @return a string array of md5 checksums 
+   * @throws IOException
+   */
+  public static String[] getFileMd5(Configuration conf) throws IOException {
+    return conf.getStrings("mapred.cache.filemd5");
+  }
+
+  /**
+   * This is to check the md5 of the archives to be localized
+   * @param conf Configuration which stores the md5's
+   * @param md5 comma seperated list of md5 checksums of the .crc's of archives.
+   * The order should be the same as the order in which the archives are added
+   */
+  public static void setArchiveMd5(Configuration conf, String md5) {
+    conf.set("mapred.cache.archivemd5", md5);
+  }
+
+  /**
+   * This is to check the md5 of the files to be localized
+   * @param conf Configuration which stores the md5's
+   * @param md5 comma seperated list of md5 checksums of the .crc's of files.
+   * The order should be the same as the order in which the files are added
+   */
+  public static void setFileMd5(Configuration conf, String md5) {
+    conf.set("mapred.cache.filemd5", md5);
+  }
+  
+  /**
+   * Set the conf to contain the location for localized archives 
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma seperated list of local archives
+   */
+  public static void setLocalArchives(Configuration conf, String str) {
+    conf.set("mapred.cache.localArchives", str);
+  }
+
+  /**
+   * Set the conf to contain the location for localized files 
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma seperated list of local files
+   */
+  public static void setLocalFiles(Configuration conf, String str) {
+    conf.set("mapred.cache.localFiles", str);
+  }
+
+  /**
+   * Add a archives to be localized to the conf
+   * @param uri The uri of the cache to be localized
+   * @param conf Configuration to add the cache to
+   */
+  public static void addCacheArchive(URI uri, Configuration conf) {
+    String archives = conf.get("mapred.cache.archives");
+    conf.set("mapred.cache.archives", archives == null ? uri.toString()
+        : archives + "," + uri.toString());
+  }
+  
+  /**
+   * Add a file to be localized to the conf
+   * @param uri The uri of the cache to be localized
+   * @param conf Configuration to add the cache to
+   */
+  public static void addCacheFile(URI uri, Configuration conf) {
+    String files = conf.get("mapred.cache.files");
+    conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
+        + uri.toString());
+  }
+
+  private static class CacheStatus {
+    // false, not loaded yet, true is loaded
+    public boolean currentStatus;
+
+    // the local load path of this cache
+    public Path localLoadPath;
+
+    // number of instances using this cache
+    public int refcount;
+
+    // The md5 checksum of the crc file of this cache
+    public byte[] md5;
+  }
+
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FileUtil.java Thu Sep 14 15:13:43 2006
@@ -17,6 +17,9 @@
 package org.apache.hadoop.fs;
 
 import java.io.*;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -230,5 +233,68 @@
     }
     return dst;
   }
-
+  
+  /**
+   * Takes an input dir and returns the du on that local directory. Very basic
+   * implementation.
+   * 
+   * @param dir
+   *          The input dir to get the disk space of this local dir
+   * @return The total disk space of the input local directory
+   */
+  public static long getDU(File dir) {
+    long size = 0;
+    if (!dir.exists())
+      return 0;
+    if (!dir.isDirectory()) {
+      return dir.length();
+    } else {
+      size = dir.length();
+      File[] allFiles = dir.listFiles();
+      for (int i = 0; i < allFiles.length; i++) {
+        size = size + getDU(allFiles[i]);
+      }
+      return size;
+    }
+  }
+    
+	/**
+   * Given a File input it will unzip the file in a the unzip directory
+   * passed as the second parameter
+   * @param inFile The zip file as input
+   * @param unzipDir The unzip directory where to unzip the zip file.
+   * @throws IOException
+   */
+  public static void unZip(File inFile, File unzipDir) throws IOException {
+    Enumeration entries;
+    ZipFile zipFile = new ZipFile(inFile);
+    ;
+    try {
+      entries = zipFile.entries();
+      while (entries.hasMoreElements()) {
+        ZipEntry entry = (ZipEntry) entries.nextElement();
+        if (!entry.isDirectory()) {
+          InputStream in = zipFile.getInputStream(entry);
+          try {
+            File file = new File(unzipDir, entry.getName());
+            file.getParentFile().mkdirs();
+            OutputStream out = new FileOutputStream(file);
+            try {
+              byte[] buffer = new byte[8192];
+              int i;
+              while ((i = in.read(buffer)) != -1) {
+                out.write(buffer, 0, i);
+              }
+            } finally {
+              out.close();
+            }
+          } finally {
+            in.close();
+          }
+        }
+      }
+    } finally {
+      zipFile.close();
+    }
+  }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Thu Sep 14 15:13:43 2006
@@ -21,7 +21,7 @@
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
-
+import org.apache.hadoop.filecache.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
@@ -227,7 +227,8 @@
         JobConf job = new JobConf(jobFile);
         return submitJob(job);
     }
-
+    
+   
     /**
      * Submit a job to the MR system
      */
@@ -244,11 +245,39 @@
         Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
         Path submitJobFile = new Path(submitJobDir, "job.xml");
         Path submitJarFile = new Path(submitJobDir, "job.jar");
-
-        String originalJarPath = job.getJar();
-
         FileSystem fs = getFs();
-
+        // try getting the md5 of the archives
+        URI[] tarchives = DistributedCache.getCacheArchives(job);
+        URI[] tfiles = DistributedCache.getCacheFiles(job);
+        if ((tarchives != null) || (tfiles != null)) {
+          // prepare these archives for md5 checksums
+          if (tarchives != null) {
+            String md5Archives = StringUtils.byteToHexString(DistributedCache
+                .createMD5(tarchives[0], job));
+            for (int i = 1; i < tarchives.length; i++) {
+              md5Archives = md5Archives
+                  + ","
+                  + StringUtils.byteToHexString(DistributedCache
+                      .createMD5(tarchives[i], job));
+            }
+            DistributedCache.setArchiveMd5(job, md5Archives);
+            //job.set("mapred.cache.archivemd5", md5Archives);
+          }
+          if (tfiles != null) {
+            String md5Files = StringUtils.byteToHexString(DistributedCache
+                .createMD5(tfiles[0], job));
+            for (int i = 1; i < tfiles.length; i++) {
+              md5Files = md5Files
+                  + ","
+                  + StringUtils.byteToHexString(DistributedCache
+                      .createMD5(tfiles[i], job));
+            }
+            DistributedCache.setFileMd5(job, md5Files);
+            //"mapred.cache.filemd5", md5Files);
+          }
+        }
+       
+        String originalJarPath = job.getJar();
         short replication = (short)job.getInt("mapred.submit.replication", 10);
 
         if (originalJarPath != null) {           // copy jar to JobTracker's fs

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu Sep 14 15:13:43 2006
@@ -171,6 +171,7 @@
     String dirs = get("mapred.input.dir");
     set("mapred.input.dir", dirs == null ? dir.toString() : dirs + "," + dir);
   }
+
   public Path[] getInputPaths() {
     String dirs = get("mapred.input.dir", "");
     ArrayList list = Collections.list(new StringTokenizer(dirs, ","));
@@ -197,8 +198,10 @@
     set("user.name", user);
   }
 
+
+  
   /**
-   * Set whether the framework shoul keep the intermediate files for 
+   * Set whether the framework should keep the intermediate files for 
    * failed tasks.
    */
   public void setKeepFailedTaskFiles(boolean keep) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Thu Sep 14 15:13:43 2006
@@ -20,11 +20,12 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.util.*;
-
+import org.apache.hadoop.filecache.*;
 import java.io.*;
 import java.util.jar.*;
 import java.util.Vector;
 import java.util.Enumeration;
+import java.net.URI;
 
 /** Base class that runs a task in a separate process.  Tasks are run in a
  * separate process in order to isolate the map/reduce system code from bugs in
@@ -61,25 +62,71 @@
   */
   public void close() throws IOException {}
 
+  private String stringifyPathArray(Path[] p){
+	  if (p == null){
+      return null;
+    }
+    String str = p[0].toString();
+    for (int i = 1; i < p.length; i++){
+      str = str + "," + p[i].toString();
+    }
+    return str;
+  }
+  
   public final void run() {
     try {
-
+      
+      //before preparing the job localize 
+      //all the archives
+      
+      URI[] archives = DistributedCache.getCacheArchives(conf);
+      URI[] files = DistributedCache.getCacheFiles(conf);
+      if ((archives != null) || (files != null)) {
+        if (archives != null) {
+          String[] md5 = DistributedCache.getArchiveMd5(conf);
+          Path[] p = new Path[archives.length];
+          for (int i = 0; i < archives.length;i++){
+            p[i] = DistributedCache.getLocalCache(archives[i], conf, conf.getLocalPath(TaskTracker.getCacheSubdir()), true, md5[i]);
+          }
+          DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
+        }
+        if ((files != null)) {
+          String[] md5 = DistributedCache.getFileMd5(conf);
+          Path[] p = new Path[files.length];
+          for (int i = 0; i < files.length;i++){
+           p[i] = DistributedCache.getLocalCache(files[i], conf, conf.getLocalPath(TaskTracker
+              .getCacheSubdir()), false, md5[i]);
+          }
+          DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
+        }
+        
+        // sets the paths to local archives and paths
+        Path localTaskFile = new Path(t.getJobFile());
+        FileSystem localFs = FileSystem.getNamed("local", conf);
+        localFs.delete(localTaskFile);
+        OutputStream out = localFs.create(localTaskFile);
+        try {
+          conf.write(out);
+        } finally {
+          out.close();
+        }
+      }
+      
       if (! prepare()) {
         return;
       }
 
       String sep = System.getProperty("path.separator");
-      File workDir = new File(new File(t.getJobFile()).getParent(), "work");
-      workDir.mkdirs();
-               
       StringBuffer classPath = new StringBuffer();
       // start with same classpath as parent process
       classPath.append(System.getProperty("java.class.path"));
       classPath.append(sep);
-
+      File workDir = new File(new File(t.getJobFile()).getParentFile().getParent(), "work");
+      workDir.mkdirs();
+	  
       String jar = conf.getJar();
-      if (jar != null) {                      // if jar exists, it into workDir
-        RunJar.unJar(new File(jar), workDir);
+      if (jar != null) {       
+    	  // if jar exists, it into workDir
         File[] libs = new File(workDir, "lib").listFiles();
         if (libs != null) {
           for (int i = 0; i < libs.length; i++) {
@@ -160,10 +207,27 @@
         LOG.warn(t.getTaskId()+" Reporting Diagnostics", e);
       }
     } finally {
+      try{
+        URI[] archives = DistributedCache.getCacheArchives(conf);
+        URI[] files = DistributedCache.getCacheFiles(conf);
+        if (archives != null){
+          for (int i = 0; i < archives.length; i++){
+            DistributedCache.releaseCache(archives[i], conf);
+          }
+        }
+        if (files != null){
+          for(int i = 0; i < files.length; i++){
+            DistributedCache.releaseCache(files[i], conf);
+          }
+        }
+      }catch(IOException ie){
+        LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
+      }
       tracker.reportTaskFinished(t.getTaskId());
     }
   }
 
+  
   /**
    * Handle deprecated mapred.child.heap.size.
    * If present, interpolate into mapred.child.java.opts value with
@@ -238,6 +302,7 @@
       logStream(process.getInputStream());        // normally empty
       
       int exit_code = process.waitFor();
+     
       if (!killed && exit_code != 0) {
         throw new IOException("Task process exit with nonzero status of " +
                               exit_code + ".");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Sep 14 15:13:43 2006
@@ -71,6 +71,7 @@
      * Map from taskId -> TaskInProgress.
      */
     TreeMap runningTasks = null;
+    Map runningJobs = null;
     int mapTotal = 0;
     int reduceTotal = 0;
     boolean justStarted = true;
@@ -89,11 +90,11 @@
 
     static Random r = new Random();
     FileSystem fs = null;
-    static final String SUBDIR = "taskTracker";
-
+    private static final String SUBDIR = "taskTracker";
+    private static final String CACHEDIR = "archive";
+    private static final String JOBCACHE = "jobcache";
     private JobConf fConf;
     private MapOutputFile mapOutputFile;
-
     private int maxCurrentTasks;
     private int failures;
     private int finishedCount[] = new int[1];
@@ -145,6 +146,14 @@
       taskCleanupThread.start();
     }
     
+    static String getCacheSubdir() {
+      return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
+    }
+
+    static String getJobCacheSubdir() {
+      return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
+    }
+    
     public long getProtocolVersion(String protocol, long clientVersion) {
       return TaskUmbilicalProtocol.versionID;
     }
@@ -167,6 +176,7 @@
         // Clear out state tables
         this.tasks = new TreeMap();
         this.runningTasks = new TreeMap();
+        this.runningJobs = new TreeMap();
         this.mapTotal = 0;
         this.reduceTotal = 0;
         this.acceptNewTasks = true;
@@ -207,8 +217,84 @@
         
         this.running = true;
     }
+        
+    // intialize the job directory
+    private void localizeJob(TaskInProgress tip) throws IOException {
+      Path localJarFile = null;
+      Task t = tip.getTask();
+      Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
+          .getJobId()
+          + Path.SEPARATOR + "job.xml"));
+      RunningJob rjob = null;
+      synchronized (runningJobs) {
+        if (!runningJobs.containsKey(t.getJobId())) {
+          rjob = new RunningJob();
+          rjob.localized = false;
+          rjob.tasks = new ArrayList();
+          rjob.jobFile = localJobFile;
+          rjob.tasks.add(tip);
+          runningJobs.put(t.getJobId(), rjob);
+        } else {
+          rjob = (RunningJob) runningJobs.get(t.getJobId());
+          // keep this for later use when we just get a jobid to delete
+          // the data for
+          rjob.tasks.add(tip);
+        }
+      }
+      synchronized (rjob) {
+        if (!rjob.localized) {
+          localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
+              .getJobId())
+              + Path.SEPARATOR + "job.jar");
+  
+          String jobFile = t.getJobFile();
+          fs.copyToLocalFile(new Path(jobFile), localJobFile);
+          JobConf localJobConf = new JobConf(localJobFile);
+          String jarFile = localJobConf.getJar();
+          if (jarFile != null) {
+            fs.copyToLocalFile(new Path(jarFile), localJarFile);
+            localJobConf.setJar(localJarFile.toString());
+            FileSystem localFs = FileSystem.getNamed("local", fConf);
+            OutputStream out = localFs.create(localJobFile);
+            try {
+              localJobConf.write(out);
+            } finally {
+              out.close();
+            }
 
-      public synchronized void shutdown() throws IOException {
+            // also unjar the job.jar files in workdir
+            File workDir = new File(
+                                    new File(localJobFile.toString()).getParent(),
+                                    "work");
+            workDir.mkdirs();
+            RunJar.unJar(new File(localJarFile.toString()), workDir);
+          }
+          rjob.localized = true;
+        }
+      }
+      launchTaskForJob(tip, new JobConf(rjob.jobFile)); 
+    }
+    
+    private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
+      synchronized (tip) {
+      try {
+        tip.setJobConf(jobConf);
+        tip.launchTask();
+      } catch (Throwable ie) {
+        tip.runstate = TaskStatus.FAILED;
+        try {
+          tip.cleanup();
+        } catch (Throwable ie2) {
+          // Ignore it, we are just trying to cleanup.
+        }
+        String error = StringUtils.stringifyException(ie);
+        tip.reportDiagnosticInfo(error);
+        LOG.info(error);
+      }
+      }
+     }
+    
+    public synchronized void shutdown() throws IOException {
           shuttingDown = true;
           close();
           if (this.server != null) {
@@ -312,6 +398,12 @@
         } catch (InterruptedException ie) {}
       }
     }
+    /**Return the DFS filesystem
+     * @return
+     */
+    public FileSystem getFileSystem(){
+      return fs;
+    }
     
     /**
      * Main service loop.  Will stay in this loop forever.
@@ -448,6 +540,10 @@
               synchronized (this) {
                 for (int i = 0; i < toCloseIds.length; i++) {
                   Object tip = tasks.get(toCloseIds[i]);
+                  synchronized(runningJobs){
+                    runningJobs.remove(((TaskInProgress)
+                	 	  tasks.get(toCloseIds[i])).getTask().getJobId());
+                  }
                   if (tip != null) {
                     tasksToCleanup.put(tip);
                   } else {
@@ -551,8 +647,8 @@
 
       return true;
     }
-
-	/**
+    
+    /**
      * Start a new task.
      * All exceptions are handled locally, so that we don't mess up the
      * task tracker.
@@ -569,20 +665,10 @@
           reduceTotal++;
         }
       }
-      synchronized (tip) {
-        try {
-          tip.launchTask();
-        } catch (Throwable ie) {
-          tip.runstate = TaskStatus.FAILED;
-          try {
-            tip.cleanup();
-          } catch (Throwable ie2) {
-            // Ignore it, we are just trying to cleanup.
-          }
-          String error = StringUtils.stringifyException(ie);
-          tip.reportDiagnosticInfo(error);
-          LOG.info(error);
-        }
+      try{
+    	  localizeJob(tip);
+      }catch(IOException ie){
+    	  LOG.warn("Error initializing Job " + tip.getTask().getJobId());
       }
     }
     
@@ -691,6 +777,7 @@
         private JobConf localJobConf;
         private boolean keepFailedTaskFiles;
         private boolean alwaysKeepTaskFiles;
+        private boolean keepJobFiles;
 
         /**
          */
@@ -702,60 +789,52 @@
             this.lastProgressReport = System.currentTimeMillis();
             this.defaultJobConf = conf;
             localJobConf = null;
+            keepJobFiles = false;
         }
-
-        /**
-         * Some fields in the Task object need to be made machine-specific.
-         * So here, edit the Task's fields appropriately.
-         */
-        private void localizeTask(Task t) throws IOException {
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + 
-                                                 task.getTaskId());
-            Path localJobFile =
-              this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.xml");
-            Path localJarFile =
-              this.defaultJobConf.getLocalPath(SUBDIR+"/"+t.getTaskId()+"/"+"job.jar");
-
-            String jobFile = t.getJobFile();
-            fs.copyToLocalFile(new Path(jobFile), localJobFile);
-            t.setJobFile(localJobFile.toString());
-
-            localJobConf = new JobConf(localJobFile);
-            localJobConf.set("mapred.local.dir",
-                    this.defaultJobConf.get("mapred.local.dir"));
-            String jarFile = localJobConf.getJar();
-            if (jarFile != null) {
-              fs.copyToLocalFile(new Path(jarFile), localJarFile);
-              localJobConf.setJar(localJarFile.toString());
-            }
-            task.localizeConfiguration(localJobConf);
-
-            FileSystem localFs = FileSystem.getNamed("local", fConf);
-            OutputStream out = localFs.create(localJobFile);
-            try {
-              localJobConf.write(out);
-            } finally {
-              out.close();
-            }
-            // set the task's configuration to the local job conf
-            // rather than the default.
-            t.setConf(localJobConf);
-            keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+        
+        private void localizeTask(Task task) throws IOException{
+            Path localTaskDir =
+              new Path(this.defaultJobConf.getLocalPath(SUBDIR+ Path.SEPARATOR
+                    + JOBCACHE + Path.SEPARATOR
+                    + task.getJobId()), task.getTaskId());
+           FileSystem localFs = FileSystem.getNamed("local", fConf);
+           localFs.mkdirs(localTaskDir);
+           Path localTaskFile = new Path(localTaskDir, "job.xml");
+           task.setJobFile(localTaskFile.toString());
+           localJobConf.set("mapred.local.dir",
+                    fConf.get("mapred.local.dir"));
+            
+           localJobConf.set("mapred.task.id", task.getTaskId());
+           keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+           task.localizeConfiguration(localJobConf);
+           OutputStream out = localFs.create(localTaskFile);
+           try {
+             localJobConf.write(out);
+           } finally {
+             out.close();
+           }
+            task.setConf(localJobConf);
             String keepPattern = localJobConf.getKeepTaskFilesPattern();
             if (keepPattern != null) {
-              alwaysKeepTaskFiles = 
+                keepJobFiles = true;
+                alwaysKeepTaskFiles = 
                 Pattern.matches(keepPattern, task.getTaskId());
             } else {
               alwaysKeepTaskFiles = false;
             }
         }
-
+        
         /**
          */
         public Task getTask() {
             return task;
         }
 
+        public void setJobConf(JobConf lconf){
+            this.localJobConf = lconf;
+            keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
+        }
+        
         /**
          */
         public synchronized TaskStatus createStatus() {
@@ -876,11 +955,18 @@
          * finished.  If the task is still running, kill it (and clean up
          */
         public synchronized void jobHasFinished() throws IOException {
+        	 
             if (getRunState() == TaskStatus.RUNNING) {
                 killAndCleanup(false);
             } else {
                 cleanup();
             }
+            if (keepJobFiles)
+              return;
+            // delete the job diretory for this task 
+            // since the job is done/failed
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                    JOBCACHE + Path.SEPARATOR +  task.getJobId());
         }
 
         /**
@@ -934,10 +1020,13 @@
                  }
                }
             }
-            this.defaultJobConf.deleteLocalFiles(SUBDIR + "/" + taskId);
-        }
+            this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
+                    JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
+                    taskId);
+            }
     }
 
+    
     // ///////////////////////////////////////////////////////////////
     // TaskUmbilicalProtocol
     /////////////////////////////////////////////////////////////////
@@ -1034,6 +1123,16 @@
         } else {
           LOG.warn("Unknown child with bad map output: "+taskid+". Ignored.");
         }
+    }
+    
+    /**
+     *  The datastructure for initializing a job
+     */
+    static class RunningJob{
+      Path jobFile;
+      // keep this for later use
+      ArrayList tasks;
+      boolean localized;
     }
 
     /** 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/StringUtils.java Thu Sep 14 15:13:43 2006
@@ -18,7 +18,10 @@
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.text.DecimalFormat;
+import org.apache.hadoop.fs.*;
 
 /**
  * General string utils
@@ -104,5 +107,82 @@
       sbuf.append(strs[idx]);
     }
     return sbuf.toString();
+  }
+
+  /**
+   * Given an array of bytes it will convert the bytes to a hex string
+   * representation of the bytes
+   * @param bytes
+   * @return hex string representation of the byte array
+   */
+  public static String byteToHexString(byte bytes[]) {
+    StringBuffer retString = new StringBuffer();
+    for (int i = 0; i < bytes.length; ++i) {
+      retString.append(Integer.toHexString(0x0100 + (bytes[i] & 0x00FF))
+          .substring(1));
+    }
+    return retString.toString();
+  }
+
+  /**
+   * Given a hexstring this will return the byte array corresponding to the
+   * string
+   * @param hex the hex String array
+   * @return a byte array that is a hex string representation of the given
+   *         string. The size of the byte array is therefore hex.length/2
+   */
+  public static byte[] hexStringToByte(String hex) {
+    byte[] bts = new byte[hex.length() / 2];
+    for (int i = 0; i < bts.length; i++) {
+      bts[i] = (byte) Integer.parseInt(hex.substring(2 * i, 2 * i + 2), 16);
+    }
+    return bts;
+  }
+  /**
+   * 
+   * @param uris
+   * @return
+   */
+  public static String uriToString(URI[] uris){
+    String ret = null;
+    ret = uris[0].toString();
+    for(int i = 1; i < uris.length;i++){
+      ret = ret + "," + uris[i].toString();
+    }
+    return ret;
+  }
+  
+  /**
+   * 
+   * @param str
+   * @return
+   */
+  public static URI[] stringToURI(String[] str){
+    if (str == null) 
+      return null;
+    URI[] uris = new URI[str.length];
+    for (int i = 0; i < str.length;i++){
+      try{
+        uris[i] = new URI(str[i]);
+      }catch(URISyntaxException ur){
+        System.out.println("Exception in specified URI's " + StringUtils.stringifyException(ur));
+        //making sure its asssigned to null in case of an error
+        uris[i] = null;
+      }
+    }
+    return uris;
+  }
+  
+  /**
+   * 
+   * @param str
+   * @return
+   */
+  public static Path[] stringToPath(String[] str){
+    Path[] p = new Path[str.length];
+    for (int i = 0; i < str.length;i++){
+      p[i] = new Path(str[i]);
+    }
+    return p;
   }
 }

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?view=auto&rev=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Thu Sep 14 15:13:43 2006
@@ -0,0 +1,205 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.mapred.MapReduceBase;
+import java.io.*;
+import org.apache.hadoop.filecache.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+public class MRCaching {
+  static String testStr = "This is a test file " + "used for testing caching "
+      + "jars, zip and normal files.";
+
+  /**
+   * Using the wordcount example and adding caching to it. The cache
+   * archives/files are set and then are checked in the map if they have been
+   * localized or not.
+   */
+  public static class MapClass extends MapReduceBase implements Mapper {
+    JobConf conf;
+
+    private final static IntWritable one = new IntWritable(1);
+
+    private Text word = new Text();
+
+    public void configure(JobConf jconf) {
+      conf = jconf;
+      try {
+        Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+        Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+        FileSystem fs = FileSystem.get(conf);
+        // read the cached files (unzipped, unjarred and text)
+        // and put it into a single file /tmp/test.txt
+        Path file = new Path("/tmp");
+        fs.mkdirs(file);
+        Path fileOut = new Path(file, "test.txt");
+        fs.delete(file);
+        DataOutputStream out = fs.create(fileOut);
+
+        for (int i = 0; i < localArchives.length; i++) {
+          // read out the files from these archives
+          File f = new File(localArchives[i].toString());
+          File txt = new File(f, "test.txt");
+          FileInputStream fin = new FileInputStream(txt);
+          DataInputStream din = new DataInputStream(fin);
+          String str = din.readLine();
+          din.close();
+          out.writeBytes(str);
+          out.writeBytes("\n");
+        }
+        for (int i = 0; i < localFiles.length; i++) {
+          // read out the files from these archives
+          File txt = new File(localFiles[i].toString());
+          FileInputStream fin = new FileInputStream(txt);
+          DataInputStream din = new DataInputStream(fin);
+          String str = din.readLine();
+          out.writeBytes(str);
+          out.writeBytes("\n");
+        }
+        out.close();
+      } catch (IOException ie) {
+        System.out.println(StringUtils.stringifyException(ie));
+      }
+    }
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector output, Reporter reporter) throws IOException {
+      String line = ((Text) value).toString();
+      StringTokenizer itr = new StringTokenizer(line);
+      while (itr.hasMoreTokens()) {
+        word.set(itr.nextToken());
+        output.collect(word, one);
+      }
+
+    }
+  }
+
+  /**
+   * A reducer class that just emits the sum of the input values.
+   */
+  public static class ReduceClass extends MapReduceBase implements Reducer {
+
+    public void reduce(WritableComparable key, Iterator values,
+        OutputCollector output, Reporter reporter) throws IOException {
+      int sum = 0;
+      while (values.hasNext()) {
+        sum += ((IntWritable) values.next()).get();
+      }
+      output.collect(key, new IntWritable(sum));
+    }
+  }
+
+  public static boolean launchMRCache(String jobTracker, String indir,
+      String outdir, String fileSys, JobConf conf, String input)
+      throws IOException {
+    final Path inDir = new Path(indir);
+    final Path outDir = new Path(outdir);
+    FileSystem fs = FileSystem.getNamed(fileSys, conf);
+    fs.delete(outDir);
+    fs.mkdirs(inDir);
+
+    {
+      DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+    conf.set("fs.default.name", fileSys);
+    conf.set("mapred.job.tracker", jobTracker);
+    conf.setJobName("cachetest");
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(MRCaching.MapClass.class);
+    conf.setCombinerClass(MRCaching.ReduceClass.class);
+    conf.setReducerClass(MRCaching.ReduceClass.class);
+    conf.setInputPath(inDir);
+    conf.setOutputPath(outDir);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setSpeculativeExecution(false);
+    Path localPath = new Path("build/test/cache");
+    Path txtPath = new Path(localPath, new Path("test.txt"));
+    Path jarPath = new Path(localPath, new Path("test.jar"));
+    Path zipPath = new Path(localPath, new Path("test.zip"));
+    Path cacheTest = new Path("/tmp/cachedir");
+    fs.delete(cacheTest);
+    fs.mkdirs(cacheTest);
+    fs.copyFromLocalFile(txtPath, cacheTest);
+    fs.copyFromLocalFile(jarPath, cacheTest);
+    fs.copyFromLocalFile(zipPath, cacheTest);
+    // setting the cached archives to zip, jar and simple text files
+    String archive1 = "dfs://" + fileSys + "/tmp/cachedir/test.jar";
+    String archive2 = "dfs://" + fileSys + "/tmp/cachedir/test.zip"; 
+    String file1 = "dfs://" + fileSys + "/tmp/cachedir/test.txt";
+    URI uri1 = null;
+    URI uri2 = null;
+    URI uri3 = null;
+    try{
+      uri1 = new URI(archive1);
+      uri2 = new URI(archive2);
+      uri3 = new URI(file1);
+    } catch(URISyntaxException ur){
+    }
+    DistributedCache.addCacheArchive(uri1, conf);
+    DistributedCache.addCacheArchive(uri2, conf);
+    DistributedCache.addCacheFile(uri3, conf);
+    JobClient.runJob(conf);
+    int count = 0;
+    // after the job ran check to see if the the input from the localized cache
+    // match the real string. check if there are 3 instances or not.
+    Path result = new Path("/tmp/test.txt");
+    {
+      BufferedReader file = new BufferedReader(new InputStreamReader(fs
+          .open(result)));
+      String line = file.readLine();
+      while (line != null) {
+        if (!testStr.equals(line))
+          return false;
+        count++;
+        line = file.readLine();
+
+      }
+      file.close();
+    }
+    if (count != 3)
+      return false;
+
+    return true;
+
+  }
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java?view=auto&rev=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSCaching.java Thu Sep 14 15:13:43 2006
@@ -0,0 +1,72 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.*;
+
+/**
+ * A JUnit test to test caching with DFS
+ * 
+ * @author Mahadev Konar
+ */
+public class TestMiniMRDFSCaching extends TestCase {
+
+  public void testWithDFS() throws IOException {
+    MiniMRCluster mr = null;
+    MiniDFSCluster dfs = null;
+    String namenode = null;
+    FileSystem fileSys = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(65314, conf, true);
+      fileSys = dfs.getFileSystem();
+      namenode = fileSys.getName();
+      mr = new MiniMRCluster(50050, 50060, 2, namenode, true);
+      JobConf jconf = new JobConf();
+      // run the wordcount example with caching
+      boolean ret = MRCaching.launchMRCache("localhost:50050",
+                                            "/testing/wc/input",
+                                            "/testing/wc/output", namenode,
+                                            jconf,
+                                            "The quick brown fox\nhas many silly\n"
+                                                + "red fox sox\n");
+      assertTrue("Archives not matching", ret);
+    } finally {
+      if (fileSys != null) {
+        fileSys.close();
+      }
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+      if (mr != null) {
+        mr.shutdown();
+      }
+    }
+  }
+
+  public static void main(String[] argv) throws Exception {
+    TestMiniMRDFSCaching td = new TestMiniMRDFSCaching();
+    td.testWithDFS();
+  }
+}

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Thu Sep 14 15:13:43 2006
@@ -36,6 +36,14 @@
           double estimate = PiEstimator.launch(NUM_MAPS, NUM_SAMPLES, "localhost:60030", "local");
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
+          JobConf jconf = new JobConf();
+          // run the wordcount example with caching
+          boolean ret = MRCaching.launchMRCache("localhost:60030", "/tmp/wc/input",
+                                                "/tmp/wc/output", "local", jconf,
+                                                "The quick brown fox\nhas many silly\n"
+                                                    + "red fox sox\n");
+          // assert the number of lines read during caching
+          assertTrue("Failed test archives not matching", ret);
       } finally {
           if (mr != null) { mr.shutdown(); }
       }

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?view=diff&rev=443497&r1=443496&r2=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Thu Sep 14 15:13:43 2006
@@ -95,12 +95,14 @@
    * @param taskDirs the task ids that should be present
    */
   private static void checkTaskDirectories(MiniMRCluster mr,
+                                           String[] jobIds,
                                            String[] taskDirs) {
     mr.waitUntilIdle();
     int trackers = mr.getNumTaskTrackers();
     List neededDirs = new ArrayList(Arrays.asList(taskDirs));
     boolean[] found = new boolean[taskDirs.length];
     for(int i=0; i < trackers; ++i) {
+      int numNotDel = 0;
       File localDir = new File(mr.getTaskTrackerLocalDir(i));
       File trackerDir = new File(localDir, "taskTracker");
       assertTrue("local dir " + localDir + " does not exist.", 
@@ -113,7 +115,7 @@
         System.out.println("Local " + localDir + ": " + contents[j]);
       }
       for(int j=0; j < trackerContents.length; ++j) {
-        System.out.println("Local " + trackerDir + ": " + trackerContents[j]);
+        System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
       }
       for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
         String name = contents[fileIdx];
@@ -123,13 +125,11 @@
                      localDir, idx != -1);
           assertTrue("Matching output directory not found " + name +
                      " in " + trackerDir, 
-                     new File(trackerDir, name).isDirectory());
+                     new File(new File(new File(trackerDir, "jobcache"), jobIds[idx]), name).isDirectory());
           found[idx] = true;
+          numNotDel++;
         }  
       }
-      assertTrue("The local directory had " + contents.length + 
-                 " and task tracker directory had " + trackerContents.length +
-                 " items.", contents.length == trackerContents.length + 1);
     }
     for(int i=0; i< found.length; i++) {
       assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
@@ -155,7 +155,7 @@
                                                jobTrackerName, namenode);
           double error = Math.abs(Math.PI - estimate);
           assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
-          checkTaskDirectories(mr, new String[]{});
+          checkTaskDirectories(mr, new String[]{}, new String[]{});
           
           // Run a word count example
           JobConf jobConf = new JobConf();
@@ -168,7 +168,7 @@
                                    3, 1);
           assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
                        "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
-          checkTaskDirectories(mr, new String[]{"task_0002_m_000001_0"});
+          checkTaskDirectories(mr, new String[]{"job_0002"}, new String[]{"task_0002_m_000001_0"});
           
       } finally {
           if (fileSys != null) { fileSys.close(); }

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar?view=auto&rev=443497
==============================================================================
Binary file - no diff available.

Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt?view=auto&rev=443497
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.txt Thu Sep 14 15:13:43 2006
@@ -0,0 +1 @@
+This is a test file used for testing caching jars, zip and normal files.

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip?view=auto&rev=443497
==============================================================================
Binary file - no diff available.

Propchange: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/test.zip
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream