You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2011/03/08 06:56:31 UTC

svn commit: r1079211 [10/11] - in /hadoop/mapreduce/branches/yahoo-merge: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/fairscheduler/designdoc/ src/contrib/streaming/...

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/DistributedCache.java Tue Mar  8 05:56:27 2011
@@ -26,9 +26,6 @@ import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.DefaultTaskController;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 
 import java.net.URI;
@@ -134,175 +131,6 @@ import java.net.URI;
 @Deprecated
 @InterfaceAudience.Private
 public class DistributedCache {
-  /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param fileStatus The file status on the dfs.
-   * @param isArchive if the cache is an archive or a file. In case it is an
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred is
-   *  returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework. 
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static Path getLocalCache(URI cache, Configuration conf, 
-                                   Path baseDir, FileStatus fileStatus,
-                                   boolean isArchive, long confFileStamp,
-                                   Path currentWorkDir) 
-      throws IOException {
-    return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
-        confFileStamp, currentWorkDir, true);
-  }
-
-  /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param fileStatus The file status on the dfs.
-   * @param isArchive if the cache is an archive or a file. In case it is an
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred is
-   *  returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @param honorSymLinkConf if this is false, then the symlinks are not
-   * created even if conf says so (this is required for an optimization in task
-   * launches
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework. 
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static Path getLocalCache(URI cache, Configuration conf, 
-      Path baseDir, FileStatus fileStatus,
-      boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-
-    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .getLocalCache(cache, conf, baseDir.toString(), fileStatus, isArchive,
-            confFileStamp, currentWorkDir, honorSymLinkConf, false);
-  }
-
-  /**
-   * Get the locally cached file or archive; it could either be 
-   * previously cached (and valid) or copy it from the {@link FileSystem} now.
-   * 
-   * @param cache the cache to be localized, this should be specified as 
-   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
-   * @param conf The Confguration file which contains the filesystem
-   * @param baseDir The base cache Dir where you wnat to localize the files/archives
-   * @param isArchive if the cache is an archive or a file. In case it is an 
-   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will 
-   *  be unzipped/unjarred/untarred automatically 
-   *  and the directory where the archive is unzipped/unjarred/untarred 
-   *  is returned as the Path.
-   *  In case of a file, the path to the file is returned
-   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
-   * file to be cached hasn't changed since the job started
-   * @param currentWorkDir this is the directory where you would want to create symlinks 
-   * for the locally cached files/archives
-   * @return the path to directory where the archives are unjarred in case of archives,
-   * the path to the file where the file is copied locally 
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework.  
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static Path getLocalCache(URI cache, Configuration conf, 
-                                   Path baseDir, boolean isArchive,
-                                   long confFileStamp, Path currentWorkDir) 
-      throws IOException {
-    return getLocalCache(cache, conf, 
-                         baseDir, null, isArchive,
-                         confFileStamp, currentWorkDir);
-  }
-
-  /**
-   * This is the opposite of getlocalcache. When you are done with
-   * using the cache, you need to release the cache
-   * @param cache The cache URI to be released
-   * @param conf configuration which contains the filesystem the cache 
-   * is contained in.
-   * @throws IOException
-   * @deprecated Internal to MapReduce framework. 
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static void releaseCache(URI cache, Configuration conf)
-      throws IOException {
-	// find the timestamp of the uri
-    URI[] archives = DistributedCache.getCacheArchives(conf);
-    URI[] files = DistributedCache.getCacheFiles(conf);
-    String[] archivesTimestamps =
-          DistributedCache.getArchiveTimestamps(conf);
-    String[] filesTimestamps =
-          DistributedCache.getFileTimestamps(conf);
-    String timestamp = null;
-    if (archives != null) {
-      for (int i = 0; i < archives.length; i++) {
-        if (archives[i].equals(cache)) {
-          timestamp = archivesTimestamps[i];
-          break;
-        }
-      }
-    }
-    if (timestamp == null && files != null) {
-      for (int i = 0; i < files.length; i++) {
-        if (files[i].equals(cache)) {
-          timestamp = filesTimestamps[i];
-          break;
-        }
-      }
-    }
-    if (timestamp == null) {
-      throw new IOException("TimeStamp of the uri couldnot be found");
-    }
-    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-           .releaseCache(cache, conf, Long.parseLong(timestamp),
-            TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
-  }
-  
-  /**
-   * Returns the relative path of the dir this cache will be localized in
-   * relative path that this cache will be localized in. For
-   * hdfs://hostname:port/absolute_path -- the relative path is
-   * hostname/absolute path -- if it is just /absolute_path -- then the
-   * relative path is hostname of DFS this mapred cluster is running
-   * on/absolute_path
-   * @deprecated Internal to MapReduce framework.  Use DistributedCacheManager
-   * instead.
-   */
-  @Deprecated
-  public static String makeRelative(URI cache, Configuration conf)
-      throws IOException {
-    return new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .makeRelative(cache, conf);
-  }
 
   /**
    * Returns mtime of a given cache file on hdfs.
@@ -361,11 +189,22 @@ public class DistributedCache {
     conf.set(MRJobConfig.CACHE_FILES, sfiles);
   }
 
+  private static Path[] parsePaths(String[] strs) {
+    if (strs == null) {
+      return null;
+    }
+    Path[] result = new Path[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = new Path(strs[i]);
+    }
+    return result;
+  }
+
   /**
    * Get cache archives set in the Configuration.  Used by
    * internal DistributedCache and MapReduce code.
    * @param conf The configuration which contains the archives
-   * @return A URI array of the caches set in the Configuration
+   * @return An array of the caches set in the Configuration
    * @throws IOException
    * @deprecated Use {@link JobContext#getCacheArchives()} instead
    */
@@ -378,7 +217,7 @@ public class DistributedCache {
    * Get cache files set in the Configuration.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which contains the files
-   * @return A URI array of the files set in the Configuration
+   * @return Am array of the files set in the Configuration
    * @throws IOException
    * @deprecated Use {@link JobContext#getCacheFiles()} instead
    */
@@ -417,30 +256,46 @@ public class DistributedCache {
   }
 
   /**
+   * Parse a list of strings into longs.
+   * @param strs the list of strings to parse
+   * @return a list of longs that were parsed. same length as strs.
+   */
+  private static long[] parseTimestamps(String[] strs) {
+    if (strs == null) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+
+  /**
    * Get the timestamps of the archives.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
-   * @return a string array of timestamps 
+   * @return a long array of timestamps 
    * @throws IOException
    * @deprecated Use {@link JobContext#getArchiveTimestamps()} instead
    */
   @Deprecated
-  public static String[] getArchiveTimestamps(Configuration conf) {
-    return conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
+  public static long[] getArchiveTimestamps(Configuration conf) {
+    return
+      parseTimestamps(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
   }
 
-
   /**
    * Get the timestamps of the files.  Used by internal
    * DistributedCache and MapReduce code.
    * @param conf The configuration which stored the timestamps
-   * @return a string array of timestamps 
+   * @return a long array of timestamps 
    * @throws IOException
    * @deprecated Use {@link JobContext#getFileTimestamps()} instead
    */
   @Deprecated
-  public static String[] getFileTimestamps(Configuration conf) {
-    return conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+  public static long[] getFileTimestamps(Configuration conf) {
+    return parseTimestamps(conf.getStrings(MRJobConfig.CACHE_FILE_TIMESTAMPS));
   }
 
   /**
@@ -511,8 +366,8 @@ public class DistributedCache {
   @Deprecated
   public static void addCacheArchive(URI uri, Configuration conf) {
     String archives = conf.get(MRJobConfig.CACHE_ARCHIVES);
-    conf.set(MRJobConfig.CACHE_ARCHIVES, archives == null ? uri.toString()
-             : archives + "," + uri.toString());
+    conf.set(MRJobConfig.CACHE_ARCHIVES,
+        archives == null ? uri.toString() : archives + "," + uri.toString());
   }
   
   /**
@@ -525,8 +380,32 @@ public class DistributedCache {
   @Deprecated
   public static void addCacheFile(URI uri, Configuration conf) {
     String files = conf.get(MRJobConfig.CACHE_FILES);
-    conf.set(MRJobConfig.CACHE_FILES, files == null ? uri.toString() : files + ","
-             + uri.toString());
+    conf.set(MRJobConfig.CACHE_FILES,
+        files == null ? uri.toString() : files + "," + uri.toString());
+  }
+  
+  /**
+   * Add a archive that has been localized to the conf.  Used
+   * by internal DistributedCache code.
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local archives
+   */
+  public static void addLocalArchives(Configuration conf, String str) {
+    String archives = conf.get(MRJobConfig.CACHE_LOCALARCHIVES);
+    conf.set(MRJobConfig.CACHE_LOCALARCHIVES,
+        archives == null ? str : archives + "," + str);
+  }
+
+  /**
+   * Add a file that has been localized to the conf..  Used
+   * by internal DistributedCache code.
+   * @param conf The conf to modify to contain the localized caches
+   * @param str a comma separated list of local files
+   */
+  public static void addLocalFiles(Configuration conf, String str) {
+    String files = conf.get(MRJobConfig.CACHE_LOCALFILES);
+    conf.set(MRJobConfig.CACHE_LOCALFILES,
+        files == null ? str : files + "," + str);
   }
 
   /**
@@ -541,8 +420,8 @@ public class DistributedCache {
   public static void addFileToClassPath(Path file, Configuration conf)
     throws IOException {
     String classpath = conf.get(MRJobConfig.CLASSPATH_FILES);
-    conf.set(MRJobConfig.CLASSPATH_FILES, classpath == null ? file.toString()
-             : classpath + "," + file.toString());
+    conf.set(MRJobConfig.CLASSPATH_FILES,
+        classpath == null ? file.toString() : classpath + "," + file);
     FileSystem fs = FileSystem.get(conf);
     URI uri = fs.makeQualified(file).toUri();
 
@@ -654,17 +533,4 @@ public class DistributedCache {
   public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
     return TrackerDistributedCacheManager.checkURIs(uriFiles, uriArchives);
   }
-
-  /**
-   * Clear the entire contents of the cache and delete the backing files. This
-   * should only be used when the server is reinitializing, because the users
-   * are going to lose their files.
-   * @deprecated Internal to MapReduce framework. 
-   * Use TrackerDistributedCacheManager instead.
-   */
-  @Deprecated
-  public static void purgeCache(Configuration conf) throws IOException {
-    new TrackerDistributedCacheManager(conf, new DefaultTaskController())
-        .purgeCache();
-  }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TaskDistributedCacheManager.java Tue Mar  8 05:56:27 2011
@@ -30,8 +30,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager.CacheStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -48,10 +51,9 @@ import org.apache.hadoop.classification.
 @InterfaceAudience.Private
 public class TaskDistributedCacheManager {
   private final TrackerDistributedCacheManager distributedCacheManager;
-  private final Configuration taskConf;
   private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
   private final List<String> classPaths = new ArrayList<String>();
- 
+  
   private boolean setupCalled = false;
 
   /**
@@ -75,9 +77,9 @@ public class TaskDistributedCacheManager
     boolean localized = false;
     /** The owner of the localized file. Relevant only on the tasktrackers */
     final String owner;
-
-    private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
-        boolean classPath) throws IOException {
+    private CacheStatus status;
+    CacheFile(URI uri, FileType type, boolean isPublic, long timestamp, 
+                      boolean classPath) throws IOException {
       this.uri = uri;
       this.type = type;
       this.isPublic = isPublic;
@@ -88,12 +90,28 @@ public class TaskDistributedCacheManager
     }
 
     /**
+     * Set the status for this cache file.
+     * @param status
+     */
+    public void setStatus(CacheStatus status) {
+      this.status = status;
+    }
+    
+    /**
+     * Get the status for this cache file.
+     * @return the status object
+     */
+    public CacheStatus getStatus() {
+      return status;
+    }
+
+    /**
      * Converts the scheme used by DistributedCache to serialize what files to
      * cache in the configuration into CacheFile objects that represent those 
      * files.
      */
     private static List<CacheFile> makeCacheFiles(URI[] uris, 
-        String[] timestamps, String cacheVisibilities[], Path[] paths,
+        long[] timestamps, boolean cacheVisibilities[], Path[] paths, 
         FileType type) throws IOException {
       List<CacheFile> ret = new ArrayList<CacheFile>();
       if (uris != null) {
@@ -109,9 +127,8 @@ public class TaskDistributedCacheManager
         for (int i = 0; i < uris.length; ++i) {
           URI u = uris[i];
           boolean isClassPath = (null != classPaths.get(u.getPath()));
-          long t = Long.parseLong(timestamps[i]);
-          ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
-              t, isClassPath));
+          ret.add(new CacheFile(u, type, cacheVisibilities[i],
+              timestamps[i], isClassPath));
         }
       }
       return ret;
@@ -130,7 +147,6 @@ public class TaskDistributedCacheManager
       TrackerDistributedCacheManager distributedCacheManager,
       Configuration taskConf) throws IOException {
     this.distributedCacheManager = distributedCacheManager;
-    this.taskConf = taskConf;
     
     this.cacheFiles.addAll(
         CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
@@ -147,36 +163,42 @@ public class TaskDistributedCacheManager
   }
 
   /**
-   * Retrieve files into the local cache and updates the task configuration 
-   * (which has been passed in via the constructor).
+   * Retrieve public distributed cache files into the local cache and updates
+   * the task configuration (which has been passed in via the constructor).
+   * The private distributed cache is just looked at and the paths where the
+   * files/archives should go to is decided here. The actual localization is
+   * done by {@link JobLocalizer}.
    * 
    * It is the caller's responsibility to re-write the task configuration XML
    * file, if necessary.
    */
-  public void setup(LocalDirAllocator lDirAlloc, File workDir, 
-      String privateCacheSubdir, String publicCacheSubDir) throws IOException {
+  public void setupCache(Configuration taskConf, String publicCacheSubdir, 
+      String privateCacheSubdir) throws IOException {
     setupCalled = true;
       
-    if (cacheFiles.isEmpty()) {
-      return;
-    }
-
     ArrayList<Path> localArchives = new ArrayList<Path>();
     ArrayList<Path> localFiles = new ArrayList<Path>();
-    Path workdirPath = new Path(workDir.getAbsolutePath());
 
     for (CacheFile cacheFile : cacheFiles) {
       URI uri = cacheFile.uri;
       FileSystem fileSystem = FileSystem.get(uri, taskConf);
       FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
-      String cacheSubdir = publicCacheSubDir;
-      if (!cacheFile.isPublic) {
-        cacheSubdir = privateCacheSubdir;
-      }
-      Path p = distributedCacheManager.getLocalCache(uri, taskConf,
-          cacheSubdir, fileStatus, 
-          cacheFile.type == CacheFile.FileType.ARCHIVE,
-          cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
+      Path p;
+      try {
+        if (cacheFile.isPublic) {
+          p = distributedCacheManager.getLocalCache(uri, taskConf,
+              publicCacheSubdir, fileStatus, 
+              cacheFile.type == CacheFile.FileType.ARCHIVE,
+              cacheFile.timestamp, cacheFile.isPublic, cacheFile);
+        } else {
+          p = distributedCacheManager.getLocalCache(uri, taskConf,
+              privateCacheSubdir, fileStatus, 
+              cacheFile.type == CacheFile.FileType.ARCHIVE,
+              cacheFile.timestamp, cacheFile.isPublic, cacheFile);
+        }
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted localizing cache file", e);
+      }
       cacheFile.setLocalized(true);
 
       if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
@@ -191,10 +213,14 @@ public class TaskDistributedCacheManager
 
     // Update the configuration object with localized data.
     if (!localArchives.isEmpty()) {
+      // TODO verify
+//      DistributedCache.addLocalArchives(taskConf, 
       TrackerDistributedCacheManager.setLocalArchives(taskConf, 
         stringifyPathList(localArchives));
     }
     if (!localFiles.isEmpty()) {
+      // TODO verify
+//      DistributedCache.addLocalFiles(taskConf, stringifyPathList(localFiles));
       TrackerDistributedCacheManager.setLocalFiles(taskConf,
         stringifyPathList(localFiles));
     }
@@ -239,9 +265,18 @@ public class TaskDistributedCacheManager
    */
   public void release() throws IOException {
     for (CacheFile c : cacheFiles) {
-      if (c.getLocalized()) {
-        distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp,
-            c.owner);
+      if (c.getLocalized() && c.status != null) {
+        distributedCacheManager.releaseCache(c.status);
+      }
+    }
+  }
+
+  public void setSizes(long[] sizes) throws IOException {
+    int i = 0;
+    for (CacheFile c: cacheFiles) {
+      if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE &&
+    	  c.status != null) {
+        distributedCacheManager.setSize(c.status, sizes[i++]);
       }
     }
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java Tue Mar  8 05:56:27 2011
@@ -20,7 +20,11 @@ package org.apache.hadoop.mapreduce.file
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.text.DateFormat;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -31,13 +35,9 @@ import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskController;
-import org.apache.hadoop.mapred.TaskController.DistributedCacheFileContext;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -46,10 +46,16 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager.CacheFile;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.util.MRAsyncDiskService;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.RunJar;
-import org.apache.hadoop.classification.InterfaceAudience;
 
 /**
  * Manages a single machine's instance of a cross-job
@@ -63,6 +69,11 @@ public class TrackerDistributedCacheMana
   // cacheID to cacheStatus mapping
   private TreeMap<String, CacheStatus> cachedArchives = 
     new TreeMap<String, CacheStatus>();
+  private Map<JobID, TaskDistributedCacheManager> jobArchives =
+    Collections.synchronizedMap(
+        new HashMap<JobID, TaskDistributedCacheManager>());
+  private static final FsPermission PUBLIC_CACHE_OBJECT_PERM =
+    FsPermission.createImmutable((short) 0755);
 
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
@@ -76,24 +87,20 @@ public class TrackerDistributedCacheMana
   private final LocalFileSystem localFs;
   
   private LocalDirAllocator lDirAllocator;
-  
-  private TaskController taskController;
-  
+
   private Configuration trackerConf;
   
-  private Random random = new Random();
+  private static final Random random = new Random();
 
   private MRAsyncDiskService asyncDiskService;
 
   BaseDirManager baseDirManager = new BaseDirManager();
   CleanupThread cleanupThread;
 
-  public TrackerDistributedCacheManager(Configuration conf,
-      TaskController taskController) throws IOException {
+  public TrackerDistributedCacheManager(Configuration conf) throws IOException {
     this.localFs = FileSystem.getLocal(conf);
     this.trackerConf = conf;
     this.lDirAllocator = new LocalDirAllocator(TTConfig.LOCAL_DIR);
-    this.taskController = taskController;
       // setting the cache size to a default of 10GB
     this.allowedCacheSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE,
           DEFAULT_CACHE_SIZE);
@@ -111,7 +118,7 @@ public class TrackerDistributedCacheMana
   public TrackerDistributedCacheManager(Configuration conf,
       TaskController taskController, MRAsyncDiskService asyncDiskService)
       throws IOException {
-    this(conf, taskController);
+    this(conf);
     this.asyncDiskService = asyncDiskService;
   }
 
@@ -145,15 +152,15 @@ public class TrackerDistributedCacheMana
    * archives, the path to the file where the file is copied locally
    * @throws IOException
    */
-  Path getLocalCache(URI cache, Configuration conf,
-      String subDir, FileStatus fileStatus,
-      boolean isArchive, long confFileStamp,
-      Path currentWorkDir, boolean honorSymLinkConf, boolean isPublic)
-      throws IOException {
-    String key;
-    key = getKey(cache, conf, confFileStamp, getLocalizedCacheOwner(isPublic));
+  Path getLocalCache(URI cache, Configuration conf, String subDir,
+      FileStatus fileStatus, boolean isArchive, long confFileStamp,
+      boolean isPublic, CacheFile file)
+      throws IOException, InterruptedException {
+    String user = getLocalizedCacheOwner(isPublic);
+    String key = getKey(cache, conf, confFileStamp, user);
     CacheStatus lcacheStatus;
     Path localizedPath = null;
+    Path localPath = null;
     synchronized (cachedArchives) {
       lcacheStatus = cachedArchives.get(key);
       if (lcacheStatus == null) {
@@ -161,44 +168,59 @@ public class TrackerDistributedCacheMana
         String uniqueString = String.valueOf(random.nextLong());
         String cachePath = new Path (subDir, 
           new Path(uniqueString, makeRelative(cache, conf))).toString();
-        Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
-          fileStatus.getLen(), trackerConf);
-        lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
-          cachePath, "")), localPath, new Path(subDir), uniqueString);
+        localPath = lDirAllocator.getLocalPathForWrite(cachePath,
+          fileStatus.getLen(), trackerConf, isPublic);
+        lcacheStatus = 
+          new CacheStatus(new Path(localPath.toString().replace(cachePath, "")),
+                          localPath, new Path(subDir), uniqueString, 
+                          isPublic ? null : user);
         cachedArchives.put(key, lcacheStatus);
       }
 
-      //mark the cache for use. 
-      lcacheStatus.refcount++;
+      //mark the cache for use.
+      file.setStatus(lcacheStatus);
+      synchronized (lcacheStatus) {
+        lcacheStatus.refcount++;
+      }
     }
     
-    boolean initSuccessful = false;
     try {
       // do the localization, after releasing the global lock
       synchronized (lcacheStatus) {
         if (!lcacheStatus.isInited()) {
-          FileSystem fs = FileSystem.get(cache, conf);
-          checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
-              lcacheStatus, fileStatus);
-          localizedPath = localizeCache(conf, cache, confFileStamp,
-              lcacheStatus, isArchive, isPublic);
+          if (isPublic) {
+            // TODO verify covered
+            //checkStampSinceJobStarted(conf, fs, cache, confFileStamp,
+            //    lcacheStatus, fileStatus);
+            localizedPath = localizePublicCacheObject(conf, cache,
+                confFileStamp, lcacheStatus, fileStatus, isArchive);
+          } else {
+            localizedPath = localPath;
+            if (!isArchive) {
+              //for private archives, the lengths come over RPC from the 
+              //JobLocalizer since the JobLocalizer is the one who expands
+              //archives and gets the total length
+              lcacheStatus.size = fileStatus.getLen();
+
+              // Increase the size and sub directory count of the cache
+              // from baseDirSize and baseDirNumberSubDir.
+              baseDirManager.addCacheUpdate(lcacheStatus);
+            }
+          }
           lcacheStatus.initComplete();
         } else {
           localizedPath = checkCacheStatusValidity(conf, cache, confFileStamp,
               lcacheStatus, fileStatus, isArchive);
         }
-        createSymlink(conf, cache, lcacheStatus, isArchive, currentWorkDir,
-            honorSymLinkConf);
       }
-      initSuccessful = true;
-      return localizedPath;
-    } finally {
-      if (!initSuccessful) {
-        synchronized (cachedArchives) {
-          lcacheStatus.refcount--;
-        }
+    } catch (IOException ie) {
+      synchronized (lcacheStatus) {
+        // release this cache
+        lcacheStatus.refcount -= 1;
+        throw ie;
       }
     }
+    return localizedPath;
   }
 
   /**
@@ -211,37 +233,30 @@ public class TrackerDistributedCacheMana
    * is contained in.
    * @throws IOException
    */
-  void releaseCache(URI cache, Configuration conf, long timeStamp,
-      String owner) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(key);
-      if (lcacheStatus == null) {
-        LOG.warn("Cannot find localized cache: " + cache + 
-                 " (key: " + key + ") in releaseCache!");
-        return;
+  void releaseCache(CacheStatus status) throws IOException {
+    synchronized (status) {
+      status.refcount--;
+    }
+  }
+
+  void setSize(CacheStatus status, long size) throws IOException {
+    if (size != 0) {
+      synchronized (status) {
+        status.size = size;
+        baseDirManager.addCacheUpdate(status);
       }
-      
-      // decrement ref count 
-      lcacheStatus.refcount--;
     }
   }
 
   /*
    * This method is called from unit tests. 
    */
-  int getReferenceCount(URI cache, Configuration conf, long timeStamp,
-      String owner) throws IOException {
-    String key = getKey(cache, conf, timeStamp, owner);
-    synchronized (cachedArchives) {
-      CacheStatus lcacheStatus = cachedArchives.get(key);
-      if (lcacheStatus == null) {
-        throw new IOException("Cannot find localized cache: " + cache);
-      }
-      return lcacheStatus.refcount;
+  int getReferenceCount(CacheStatus status) throws IOException {
+    synchronized (status) {
+      return status.refcount;
     }
   }
-  
+
   /**
    * Get the user who should "own" the localized distributed cache file.
    * If the cache is public, the tasktracker user is the owner. If private,
@@ -266,6 +281,7 @@ public class TrackerDistributedCacheMana
    */
   private static void deleteLocalPath(MRAsyncDiskService asyncDiskService,
       LocalFileSystem fs, Path path) throws IOException {
+    // TODO need to make asyncDiskService use taskController
     boolean deleted = false;
     if (asyncDiskService != null) {
       // Try to delete using asyncDiskService
@@ -408,53 +424,72 @@ public class TrackerDistributedCacheMana
     return cacheStatus.localizedLoadPath;
   }
   
-  private void createSymlink(Configuration conf, URI cache,
-      CacheStatus cacheStatus, boolean isArchive,
-      Path currentWorkDir, boolean honorSymLinkConf) throws IOException {
-    boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
-    if(cache.getFragment() == null) {
-      doSymlink = false;
-    }
-    String link = 
-      currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
-    File flink = new File(link);
-    if (doSymlink){
-      if (!flink.exists()) {
-        FileUtil.symLink(cacheStatus.localizedLoadPath.toString(), link);
-      }
-    }
+  private static Path createRandomPath(Path base) throws IOException {
+    return new Path(base.toString() + "-work-" + random.nextLong());
   }
-  
-  // the method which actually copies the caches locally and unjars/unzips them
-  // and does chmod for the files
-  Path localizeCache(Configuration conf,
-                                    URI cache, long confFileStamp,
-                                    CacheStatus cacheStatus,
-                                    boolean isArchive, boolean isPublic)
-  throws IOException {
-    FileSystem fs = FileSystem.get(cache, conf);
+
+  /**
+   * Download a given path to the local file system.
+   * @param conf the job's configuration
+   * @param source the source to copy from
+   * @param destination where to copy the file. must be local fs
+   * @param desiredTimestamp the required modification timestamp of the source
+   * @param isArchive is this an archive that should be expanded
+   * @param permission the desired permissions of the file.
+   * @return for archives, the number of bytes in the unpacked directory
+   * @throws IOException
+   */
+  public static long downloadCacheObject(Configuration conf,
+                                         URI source,
+                                         Path destination,
+                                         long desiredTimestamp,
+                                         boolean isArchive,
+                                         FsPermission permission
+                                         ) throws IOException,
+                                                  InterruptedException {
+    FileSystem sourceFs = FileSystem.get(source, conf);
     FileSystem localFs = FileSystem.getLocal(conf);
+    
+    Path sourcePath = new Path(source.getPath());
+    long modifiedTime = 
+      sourceFs.getFileStatus(sourcePath).getModificationTime();
+    if (modifiedTime != desiredTimestamp) {
+      DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT, 
+                                                     DateFormat.SHORT);
+      throw new IOException("The distributed cache object " + source + 
+                            " changed during the job from " + 
+                            df.format(new Date(desiredTimestamp)) + " to " +
+                            df.format(new Date(modifiedTime)));
+    }
+    
     Path parchive = null;
     if (isArchive) {
-      parchive = new Path(cacheStatus.localizedLoadPath,
-        new Path(cacheStatus.localizedLoadPath.getName()));
+      parchive = new Path(destination, destination.getName());
     } else {
-      parchive = cacheStatus.localizedLoadPath;
+      parchive = destination;
     }
-
-    if (!localFs.mkdirs(parchive.getParent())) {
-      throw new IOException("Mkdirs failed to create directory " +
-          cacheStatus.localizedLoadPath.toString());
-    }
-
-    String cacheId = cache.getPath();
-    fs.copyToLocalFile(new Path(cacheId), parchive);
+    // if the file already exists, we are done
+    if (localFs.exists(parchive)) {
+      return 0;
+    }
+    // the final directory for the object
+    Path finalDir = parchive.getParent();
+    // the work directory for the object
+    Path workDir = createRandomPath(finalDir);
+    LOG.info("Creating " + destination.getName() + " in " + workDir + " with " + 
+            permission);
+    if (!localFs.mkdirs(workDir, permission)) {
+      throw new IOException("Mkdirs failed to create directory " + workDir);
+    }
+    Path workFile = new Path(workDir, parchive.getName());
+    sourceFs.copyToLocalFile(sourcePath, workFile);
+    localFs.setPermission(workFile, permission);
     if (isArchive) {
-      String tmpArchive = parchive.toString().toLowerCase();
-      File srcFile = new File(parchive.toString());
-      File destDir = new File(parchive.getParent().toString());
+      String tmpArchive = workFile.getName().toLowerCase();
+      File srcFile = new File(workFile.toString());
+      File destDir = new File(workDir.toString());
       LOG.info(String.format("Extracting %s to %s",
-          srcFile.toString(), destDir.toString()));
+               srcFile.toString(), destDir.toString()));
       if (tmpArchive.endsWith(".jar")) {
         RunJar.unJar(srcFile, destDir);
       } else if (tmpArchive.endsWith(".zip")) {
@@ -468,47 +503,48 @@ public class TrackerDistributedCacheMana
         // else will not do anyhting
         // and copy the file into the dir as it is
       }
+      FileUtil.chmod(destDir.toString(), "ugo+rx", true);
+    }
+    // promote the output to the final location
+    if (!localFs.rename(workDir, finalDir)) {
+      localFs.delete(workDir, true);
+      if (!localFs.exists(finalDir)) {
+        throw new IOException("Failed to promote distributed cache object " +
+                              workDir + " to " + finalDir);
+      }
+      // someone else promoted first
+      return 0;
     }
 
+    LOG.info(String.format("Cached %s as %s",
+             source.toString(), destination.toString()));
     long cacheSize = 
       FileUtil.getDU(new File(parchive.getParent().toString()));
-    cacheStatus.size = cacheSize;
+    return cacheSize;
+  }
+
+  //the method which actually copies the caches locally and unjars/unzips them
+  // and does chmod for the files
+  Path localizePublicCacheObject(Configuration conf,
+                                 URI cache, long confFileStamp,
+                                 CacheStatus cacheStatus,
+                                 FileStatus fileStatus,
+                                 boolean isArchive
+                                 ) throws IOException, InterruptedException {
+    long size = downloadCacheObject(conf, cache, cacheStatus.localizedLoadPath,
+                                    confFileStamp, isArchive, 
+                                    PUBLIC_CACHE_OBJECT_PERM);
+    cacheStatus.size = size;
+
     // Increase the size and sub directory count of the cache
     // from baseDirSize and baseDirNumberSubDir.
     baseDirManager.addCacheUpdate(cacheStatus);
 
-    // set proper permissions for the localized directory
-    setPermissions(conf, cacheStatus, isPublic);
-
-    // update cacheStatus to reflect the newly cached file
-    cacheStatus.mtime = getTimestamp(conf, cache);
-
     LOG.info(String.format("Cached %s as %s",
              cache.toString(), cacheStatus.localizedLoadPath));
     return cacheStatus.localizedLoadPath;
   }
 
-  private void setPermissions(Configuration conf, CacheStatus cacheStatus,
-      boolean isPublic) throws IOException {
-    if (isPublic) {
-      Path localizedUniqueDir = cacheStatus.getLocalizedUniqueDir();
-      LOG.info("Doing chmod on localdir :" + localizedUniqueDir);
-      try {
-        FileUtil.chmod(localizedUniqueDir.toString(), "ugo+rx", true);
-      } catch (InterruptedException e) {
-        LOG.warn("Exception in chmod" + e.toString());
-        throw new IOException(e);
-      }
-    } else {
-      // invoke taskcontroller to set permissions
-      DistributedCacheFileContext context = new DistributedCacheFileContext(
-          conf.get(MRJobConfig.USER_NAME), new File(cacheStatus.localizedBaseDir
-              .toString()), cacheStatus.localizedBaseDir,
-          cacheStatus.uniqueString);
-      taskController.initializeDistributedCacheFile(context);
-    }
-  }
-
   private static boolean isTarFile(String filename) {
     return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
            filename.endsWith(".tar"));
@@ -542,12 +578,18 @@ public class TrackerDistributedCacheMana
                                           CacheStatus lcacheStatus,
                                           FileStatus fileStatus)
   throws IOException {
-    long dfsFileStamp = checkStampSinceJobStarted(conf, fs, cache,
-        confFileStamp, lcacheStatus, fileStatus);
-    if (dfsFileStamp != lcacheStatus.mtime) {
-      return false;
+    long dfsFileStamp;
+    if (fileStatus != null) {
+      dfsFileStamp = fileStatus.getModificationTime();
+    } else {
+      dfsFileStamp = getTimestamp(conf, cache);
     }
 
+    if (dfsFileStamp != confFileStamp) {
+      LOG.fatal("File: " + cache + " has changed on HDFS since job started");
+      throw new IOException("File: " + cache +
+                            " has changed on HDFS since job started");
+    }
     return true;
   }
 
@@ -596,7 +638,6 @@ public class TrackerDistributedCacheMana
     // individual cacheStatus lock.
     //
     long size;              //the size of this cache.
-    long mtime;             // the cache-file modification time
     boolean inited = false; // is it initialized ?
 
     //
@@ -607,19 +648,21 @@ public class TrackerDistributedCacheMana
     final Path subDir;
     // unique string used in the construction of local load path
     final String uniqueString;
+    // The user that owns the cache entry or null if it is public
+    final String user;
     // the local load path of this cache
     final Path localizedLoadPath;
     //the base dir where the cache lies
     final Path localizedBaseDir;
 
     public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
-        String uniqueString) {
+        String uniqueString, String user) {
       super();
       this.localizedLoadPath = localLoadPath;
       this.refcount = 0;
-      this.mtime = -1;
       this.localizedBaseDir = baseDir;
       this.size = 0;
+      this.user = user;
       this.subDir = subDir;
       this.uniqueString = uniqueString;
     }
@@ -662,8 +705,22 @@ public class TrackerDistributedCacheMana
   }
 
   public TaskDistributedCacheManager newTaskDistributedCacheManager(
-      Configuration taskConf) throws IOException {
-    return new TaskDistributedCacheManager(this, taskConf);
+      JobID jobId, Configuration taskConf) throws IOException {
+    TaskDistributedCacheManager result =
+      new TaskDistributedCacheManager(this, taskConf);
+    jobArchives.put(jobId, result);
+    return result;
+  }
+ 
+  public void deleteTaskDistributedCacheManager(JobID jobId) {
+    jobArchives.remove(jobId);
+  }
+  
+  public void setArchiveSizes(JobID jobId, long[] sizes) throws IOException {
+    TaskDistributedCacheManager mgr = jobArchives.get(jobId);
+    if (mgr != null) {
+      mgr.setSizes(sizes);
+    }
   }
 
   /**
@@ -776,6 +833,17 @@ public class TrackerDistributedCacheMana
     }
   }
   
+  private static boolean[] parseBooleans(String[] strs) {
+    if (null == strs) {
+      return null;
+    }
+    boolean[] result = new boolean[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Boolean.parseBoolean(strs[i]);
+    }
+    return result;
+  }
+
   /**
    * Get the booleans on whether the files are public or not.  Used by 
    * internal DistributedCache and MapReduce code.
@@ -783,8 +851,8 @@ public class TrackerDistributedCacheMana
    * @return a string array of booleans 
    * @throws IOException
    */
-  static String[] getFileVisibilities(Configuration conf) {
-    return conf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES);
+  public static boolean[] getFileVisibilities(Configuration conf) {
+    return parseBooleans(conf.getStrings(MRJobConfig.CACHE_FILE_VISIBILITIES));
   }
 
   /**
@@ -793,8 +861,8 @@ public class TrackerDistributedCacheMana
    * @param conf The configuration which stored the timestamps
    * @return a string array of booleans 
    */
-  static String[] getArchiveVisibilities(Configuration conf) {
-    return conf.getStrings(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES);
+  public static boolean[] getArchiveVisibilities(Configuration conf) {
+    return parseBooleans(conf.getStrings(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES));
   }
 
   /**

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Tue Mar  8 05:56:27 2011
@@ -131,7 +131,7 @@ class ChainMapContextImpl<KEYIN, VALUEIN
   }
 
   @Override
-  public String[] getArchiveTimestamps() {
+  public long[] getArchiveTimestamps() {
     return base.getArchiveTimestamps();
   }
 
@@ -162,7 +162,7 @@ class ChainMapContextImpl<KEYIN, VALUEIN
   }
 
   @Override
-  public String[] getFileTimestamps() {
+  public long[] getFileTimestamps() {
     return base.getFileTimestamps();
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Tue Mar  8 05:56:27 2011
@@ -124,7 +124,7 @@ class ChainReduceContextImpl<KEYIN, VALU
   }
 
   @Override
-  public String[] getArchiveTimestamps() {
+  public long[] getArchiveTimestamps() {
     return base.getArchiveTimestamps();
   }
 
@@ -155,7 +155,7 @@ class ChainReduceContextImpl<KEYIN, VALU
   }
 
   @Override
-  public String[] getFileTimestamps() {
+  public long[] getFileTimestamps() {
     return base.getFileTimestamps();
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Tue Mar  8 05:56:27 2011
@@ -133,7 +133,7 @@ public class WrappedMapper<KEYIN, VALUEI
     }
 
     @Override
-    public String[] getArchiveTimestamps() {
+    public long[] getArchiveTimestamps() {
       return mapContext.getArchiveTimestamps();
     }
 
@@ -164,7 +164,7 @@ public class WrappedMapper<KEYIN, VALUEI
     }
 
     @Override
-    public String[] getFileTimestamps() {
+    public long[] getFileTimestamps() {
       return mapContext.getFileTimestamps();
     }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Tue Mar  8 05:56:27 2011
@@ -126,7 +126,7 @@ public class WrappedReducer<KEYIN, VALUE
     }
 
     @Override
-    public String[] getArchiveTimestamps() {
+    public long[] getArchiveTimestamps() {
       return reduceContext.getArchiveTimestamps();
     }
 
@@ -157,7 +157,7 @@ public class WrappedReducer<KEYIN, VALUE
     }
 
     @Override
-    public String[] getFileTimestamps() {
+    public long[] getFileTimestamps() {
       return reduceContext.getFileTimestamps();
     }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Tue Mar  8 05:56:27 2011
@@ -182,7 +182,7 @@ public class TokenCache {
    * @throws IOException
    */
   @InterfaceAudience.Private
-  public static Credentials loadTokens(String jobTokenFile, JobConf conf) 
+  public static Credentials loadTokens(String jobTokenFile, Configuration conf) 
   throws IOException {
     Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Tue Mar  8 05:56:27 2011
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapred.TaskTracker;
-import org.apache.hadoop.mapred.TaskController.InitializationContext;
 import org.apache.hadoop.mapreduce.JobID;
 
 @InterfaceAudience.Private
@@ -44,19 +43,16 @@ public class Localizer {
 
   private FileSystem fs;
   private String[] localDirs;
-  private TaskController taskController;
 
   /**
    * Create a Localizer instance
    * 
    * @param fileSys
    * @param lDirs
-   * @param tc
    */
-  public Localizer(FileSystem fileSys, String[] lDirs, TaskController tc) {
+  public Localizer(FileSystem fileSys, String[] lDirs) {
     fs = fileSys;
     localDirs = lDirs;
-    taskController = tc;
   }
 
   @InterfaceAudience.Private
@@ -264,13 +260,6 @@ public class Localizer {
                 + user);
       }
 
-      // Now, run the task-controller specific code to initialize the
-      // user-directories.
-      InitializationContext context = new InitializationContext();
-      context.user = user;
-      context.workDir = null;
-      taskController.initializeUser(context);
-
       // Localization of the user is done
       localizedUser.set(true);
     }
@@ -283,7 +272,7 @@ public class Localizer {
    * <br>
    * Here, we set 700 permissions on the job directories created on all disks.
    * This we do so as to avoid any misuse by other users till the time
-   * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+   * {@link TaskController#initializeJob} is run at a
    * later time to set proper private permissions on the job directories. <br>
    * 
    * @param user
@@ -331,16 +320,15 @@ public class Localizer {
    * @param user
    * @param jobId
    * @param attemptId
-   * @param isCleanupAttempt
    * @throws IOException
    */
   public void initializeAttemptDirs(String user, String jobId,
-      String attemptId, boolean isCleanupAttempt)
+      String attemptId)
       throws IOException {
 
     boolean initStatus = false;
     String attemptDirPath =
-        TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);
+        TaskTracker.getLocalTaskDir(user, jobId, attemptId);
 
     for (String localDir : localDirs) {
       Path localAttemptDir = new Path(localDir, attemptDirPath);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/task/JobContextImpl.java Tue Mar  8 05:56:27 2011
@@ -334,7 +334,8 @@ public class JobContextImpl implements J
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getArchiveTimestamps() {
+  @Override
+  public long[] getArchiveTimestamps() {
     return DistributedCache.getArchiveTimestamps(conf);
   }
 
@@ -344,7 +345,8 @@ public class JobContextImpl implements J
    * @return a string array of timestamps 
    * @throws IOException
    */
-  public String[] getFileTimestamps() {
+  @Override
+  public long[] getFileTimestamps() {
     return DistributedCache.getFileTimestamps(conf);
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/MRAsyncDiskService.java Tue Mar  8 05:56:27 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskController;
 import org.apache.hadoop.util.AsyncDiskService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -57,6 +58,8 @@ public class MRAsyncDiskService {
   
   AsyncDiskService asyncDiskService;
   
+  TaskController taskController;
+  
   public static final String TOBEDELETED = "toBeDeleted";
   
   /**
@@ -64,14 +67,18 @@ public class MRAsyncDiskService {
    * root directories).
    * 
    * The AsyncDiskServices uses one ThreadPool per volume to do the async disk
-   * operations.
+   * operations. A {@link TaskController} is passed that will be used to do
+   * the deletes
    * 
    * @param localFileSystem The localFileSystem used for deletions.
+   * @param taskController The taskController that should be used for the 
+   * delete operations
    * @param nonCanonicalVols The roots of the file system volumes, which may
    * be absolte paths, or paths relative to the ${user.dir} system property
    * ("cwd").
    */
-  public MRAsyncDiskService(FileSystem localFileSystem,
+  public MRAsyncDiskService(FileSystem localFileSystem, 
+      TaskController taskController,
       String... nonCanonicalVols) throws IOException {
     
     this.localFileSystem = localFileSystem;
@@ -84,6 +91,8 @@ public class MRAsyncDiskService {
     
     asyncDiskService = new AsyncDiskService(this.volumes);
     
+    this.taskController = taskController;
+    
     // Create one ThreadPool per volume
     for (int v = 0 ; v < volumes.length; v++) {
       // Create the root for file deletion
@@ -109,13 +118,31 @@ public class MRAsyncDiskService {
               + " because it's outside of " + volumes[v]);
         }
         DeleteTask task = new DeleteTask(volumes[v], absoluteFilename,
-            relative);
+            relative, files[f].getOwner());
         execute(volumes[v], task);
       }
     }
   }
   
   /**
+   * Create a AsyncDiskServices with a set of volumes (specified by their
+   * root directories).
+   * 
+   * The AsyncDiskServices uses one ThreadPool per volume to do the async disk
+   * operations.
+   * 
+   * @param localFileSystem The localFileSystem used for deletions.
+   * @param nonCanonicalVols The roots of the file system volumes, which may
+   * be absolte paths, or paths relative to the ${user.dir} system property
+   * ("cwd").
+   */ 
+  public MRAsyncDiskService(FileSystem localFileSystem, 
+      String... nonCanonicalVols) throws IOException {
+    this(localFileSystem, null, nonCanonicalVols);
+  }
+  
+  
+  /**
    * Initialize MRAsyncDiskService based on conf.
    * @param conf  local file system and local dirs will be read from conf 
    */
@@ -174,6 +201,8 @@ public class MRAsyncDiskService {
     String originalPath;
     /** The file name after the move */
     String pathToBeDeleted;
+    /** The owner of the file */
+    String owner;
     
     /**
      * Delete a file/directory (recursively if needed).
@@ -181,11 +210,14 @@ public class MRAsyncDiskService {
      * @param originalPath  The original name, relative to volume root.
      * @param pathToBeDeleted  The name after the move, relative to volume root,
      *                         containing TOBEDELETED.
+     * @param owner         The owner of the file
      */
-    DeleteTask(String volume, String originalPath, String pathToBeDeleted) {
+    DeleteTask(String volume, String originalPath, String pathToBeDeleted, 
+        String owner) {
       this.volume = volume;
       this.originalPath = originalPath;
       this.pathToBeDeleted = pathToBeDeleted;
+      this.owner = owner;
     }
     
     @Override
@@ -201,7 +233,12 @@ public class MRAsyncDiskService {
       Exception e = null;
       try {
         Path absolutePathToBeDeleted = new Path(volume, pathToBeDeleted);
-        success = localFileSystem.delete(absolutePathToBeDeleted, true);
+        if (taskController != null & owner != null) {
+          taskController.deleteAsUser(owner, 
+                                      absolutePathToBeDeleted.toString());
+        } else {
+          success = localFileSystem.delete(absolutePathToBeDeleted, true);
+        }
       } catch (Exception ex) {
         e = ex;
       }
@@ -262,8 +299,9 @@ public class MRAsyncDiskService {
       // Return false in case that the file is not found.  
       return false;
     }
-
-    DeleteTask task = new DeleteTask(volume, pathName, newPathName);
+    FileStatus status = localFileSystem.getFileStatus(target);
+    DeleteTask task = new DeleteTask(volume, pathName, newPathName, 
+                                     status.getOwner());
     execute(volume, task);
     return true;
   }
@@ -371,5 +409,31 @@ public class MRAsyncDiskService {
     throw new IOException("Cannot delete " + absolutePathName
         + " because it's outside of all volumes.");
   }
-  
+  /**
+   * Move the path name to a temporary location and then delete it.
+   * 
+   * Note that if there is no volume that contains this path, the path
+   * will stay as it is, and the function will return false.
+   *  
+   * This functions returns when the moves are done, but not necessarily all
+   * deletions are done. This is usually good enough because applications 
+   * won't see the path name under the old name anyway after the move. 
+   * 
+   * @param volume              The disk volume
+   * @param absolutePathName    The path name from root "/"
+   * @throws IOException        If the move failed
+   * @return   false if we are unable to move the path name
+   */
+  public boolean moveAndDeleteAbsolutePath(String volume, 
+                                           String absolutePathName)
+  throws IOException {
+    String relative = getRelativePathName(absolutePathName, volume);
+    if (relative == null) {
+      // This should never happen
+      throw new IOException("Cannot delete " + absolutePathName
+          + " because it's outside of " + volume);
+    }
+    return moveAndDeleteRelativePath(volume, relative);
+  }
+
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/util/ProcfsBasedProcessTree.java Tue Mar  8 05:56:27 2011
@@ -35,15 +35,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.DefaultTaskController;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.TaskController.Signal;
 
 /**
  * A Proc file-system based ProcessTree. Works only on Linux.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public class ProcfsBasedProcessTree extends ProcessTree {
+public class ProcfsBasedProcessTree {
 
   static final Log LOG = LogFactory
       .getLog(ProcfsBasedProcessTree.class);
@@ -91,20 +94,19 @@ public class ProcfsBasedProcessTree exte
   // to a test directory.
   private String procfsDir;
   
-  private Integer pid = -1;
+  private final Integer pid;
   private Long cpuTime = 0L;
   private boolean setsidUsed = false;
-  private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
 
-  private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
+  private Map<Integer, ProcessInfo> processTree =
+    new HashMap<Integer, ProcessInfo>();
 
   public ProcfsBasedProcessTree(String pid) {
-    this(pid, false, DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+    this(pid, false);
   }
 
-  public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
-                                long sigkillInterval) {
-    this(pid, setsidUsed, sigkillInterval, PROCFS);
+  public ProcfsBasedProcessTree(String pid, boolean setsidUsed) {
+    this(pid, setsidUsed, PROCFS);
   }
 
   /**
@@ -115,29 +117,14 @@ public class ProcfsBasedProcessTree exte
    * 
    * @param pid root of the process tree
    * @param setsidUsed true, if setsid was used for the root pid
-   * @param sigkillInterval how long to wait between a SIGTERM and SIGKILL 
-   *                        when killing a process tree
    * @param procfsDir the root of a proc file system - only used for testing. 
    */
   public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
-                                long sigkillInterval, String procfsDir) {
+      String procfsDir) {
     this.pid = getValidPID(pid);
     this.setsidUsed = setsidUsed;
-    sleeptimeBeforeSigkill = sigkillInterval;
     this.procfsDir = procfsDir;
   }
-  
-  /**
-   * Sets SIGKILL interval
-   * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
-   *                  String, boolean, long)} instead
-   * @param interval The time to wait before sending SIGKILL
-   *                 after sending SIGTERM
-   */
-  @Deprecated
-  public void setSigKillInterval(long interval) {
-    sleeptimeBeforeSigkill = interval;
-  }
 
   /**
    * Checks if the ProcfsBasedProcessTree is available on this system.
@@ -238,112 +225,49 @@ public class ProcfsBasedProcessTree exte
 
   /**
    * Is the root-process alive?
-   * 
    * @return true if the root-process is alive, false otherwise.
    */
-  public boolean isAlive() {
-    if (pid == -1) {
-      return false;
-    } else {
-      return isAlive(pid.toString());
-    }
+  boolean isAlive(int pid, TaskController taskController) {
+    try {
+      return taskController.signalTask(null, pid, Signal.NULL);
+    } catch (IOException ignored) { }
+    return false;
+  }
+
+  boolean isAlive(TaskController taskController) {
+    return isAlive(pid, taskController);
   }
 
   /**
    * Is any of the subprocesses in the process-tree alive?
-   * 
    * @return true if any of the processes in the process-tree is
    *           alive, false otherwise.
    */
-  public boolean isAnyProcessInTreeAlive() {
+  boolean isAnyProcessInTreeAlive(TaskController taskController) {
     for (Integer pId : processTree.keySet()) {
-      if (isAlive(pId.toString())) {
+      if (isAlive(pId, taskController)) {
         return true;
       }
     }
     return false;
   }
 
+
   /** Verify that the given process id is same as its process group id.
    * @param pidStr Process id of the to-be-verified-process
    * @param procfsDir  Procfs root dir
    */
-  static boolean checkPidPgrpidForMatch(String pidStr, String procfsDir) {
-    Integer pId = Integer.parseInt(pidStr);
-    // Get information for this process
-    ProcessInfo pInfo = new ProcessInfo(pId);
-    pInfo = constructProcessInfo(pInfo, procfsDir);
-    if (pInfo == null) {
-      // process group leader may have finished execution, but we still need to
-      // kill the subProcesses in the process group.
-      return true;
-    }
-
-    //make sure that pId and its pgrpId match
-    if (!pInfo.getPgrpId().equals(pId)) {
-      LOG.warn("Unexpected: Process with PID " + pId +
-               " is not a process group leader.");
-      return false;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(pId + " is a process group leader, as expected.");
-    }
-    return true;
+  public boolean checkPidPgrpidForMatch() {
+    return checkPidPgrpidForMatch(pid, PROCFS);
   }
 
-  /** Make sure that the given pid is a process group leader and then
-   * destroy the process group.
-   * @param pgrpId   Process group id of to-be-killed-processes
-   * @param interval The time to wait before sending SIGKILL
-   *                 after sending SIGTERM
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  public static void assertAndDestroyProcessGroup(String pgrpId, long interval,
-                       boolean inBackground)
-         throws IOException {
-    // Make sure that the pid given is a process group leader
-    if (!checkPidPgrpidForMatch(pgrpId, PROCFS)) {
-      throw new IOException("Process with PID " + pgrpId  +
-                          " is not a process group leader.");
-    }
-    destroyProcessGroup(pgrpId, interval, inBackground);
-  }
-
-  /**
-   * Destroy the process-tree.
-   */
-  public void destroy() {
-    destroy(true);
-  }
-  
-  /**
-   * Destroy the process-tree.
-   * @param inBackground Process is to be killed in the back ground with
-   *                     a separate thread
-   */
-  public void destroy(boolean inBackground) {
-    LOG.debug("Killing ProcfsBasedProcessTree of " + pid);
-    if (pid == -1) {
-      return;
-    }
-    if (isAlive(pid.toString())) {
-      if (isSetsidAvailable && setsidUsed) {
-        // In this case, we know that pid got created using setsid. So kill the
-        // whole processGroup.
-        try {
-          assertAndDestroyProcessGroup(pid.toString(), sleeptimeBeforeSigkill,
-                              inBackground);
-        } catch (IOException e) {
-          LOG.warn(StringUtils.stringifyException(e));
-        }
-      }
-      else {
-        //TODO: Destroy all the processes in the subtree in this case also.
-        // For the time being, killing only the root process.
-        destroyProcess(pid.toString(), sleeptimeBeforeSigkill, inBackground);
-      }
-    }
+  static boolean checkPidPgrpidForMatch(int _pid, String procfs) {
+    // Get information for this process
+    ProcessInfo pInfo = new ProcessInfo(_pid);
+    pInfo = constructProcessInfo(pInfo, procfs);
+    // null if process group leader finished execution; issue no warning
+    // make sure that pid and its pgrpId match
+    return pInfo == null || pInfo.getPgrpId().equals(_pid);
   }
 
   private static final String PROCESSTREE_DUMP_FORMAT =

Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java Tue Mar  8 05:56:27 2011
@@ -38,14 +38,20 @@ public class ProcfsBasedProcessTree exte
     super(pid);
   }
 
+  /**
+   * @param sigkillInterval Has no effect
+   */
   public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
       long sigkillInterval) {
-    super(pid, setsidUsed, sigkillInterval);
+    super(pid, setsidUsed);
   }
 
+  /**
+   * @param sigkillInterval Has no effect
+   */
   public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
       long sigkillInterval, String procfsDir) {
-    super(pid, setsidUsed, sigkillInterval, procfsDir);
+    super(pid, setsidUsed, procfsDir);
   }
 
   public ProcfsBasedProcessTree getProcessTree() {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/ClusterWithLinuxTaskController.java Tue Mar  8 05:56:27 2011
@@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -80,41 +81,23 @@ public class ClusterWithLinuxTaskControl
         + "/task-controller";
     
     @Override
-    public void setup() throws IOException {
+    public void setup(LocalDirAllocator allocator) throws IOException {
       getConf().set(TTConfig.TT_GROUP, taskTrackerSpecialGroup);
 
       // write configuration file
       configurationFile = createTaskControllerConf(System
           .getProperty(TASKCONTROLLER_PATH), getConf());
-      super.setup();
+      super.setup(allocator);
     }
 
     @Override
-    protected String getTaskControllerExecutablePath() {
+    protected String getTaskControllerExecutablePath(Configuration conf) {
       return taskControllerExePath;
     }
 
     void setTaskControllerExe(String execPath) {
       this.taskControllerExePath = execPath;
     }
-
-    volatile static int attemptedSigQuits = 0;
-    volatile static int failedSigQuits = 0;
-
-    /** Work like LinuxTaskController, but also count the number of
-      * attempted and failed SIGQUIT sends via the task-controller
-      * executable.
-      */
-    @Override
-    void dumpTaskStack(TaskControllerContext context) {
-      attemptedSigQuits++;
-      try {
-        signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
-      } catch (Exception e) {
-        LOG.warn("Execution sending SIGQUIT: " + StringUtils.stringifyException(e));
-        failedSigQuits++;
-      }
-    }
   }
 
   // cluster instances which sub classes can use
@@ -275,7 +258,7 @@ public class ClusterWithLinuxTaskControl
       if (ugi.indexOf(",") > 1) {
         return true;
       }
-      LOG.info("Invalid taskcontroller-ugi : " + ugi); 
+      LOG.info("Invalid taskcontroller-ugi (requires \"user,group\"): " + ugi); 
       return false;
     }
     LOG.info("Invalid taskcontroller-ugi : " + ugi);

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestDebugScript.java Tue Mar  8 05:56:27 2011
@@ -35,7 +35,9 @@ import static org.junit.Assert.*;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.Ignore;
 
+@Ignore("The debug script is broken in the current build.")
 public class TestDebugScript {
 
   // base directory which is used by the debug script

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobExecutionAsDifferentUser.java Tue Mar  8 05:56:27 2011
@@ -23,7 +23,6 @@ import java.security.PrivilegedException
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.SleepJob;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -108,33 +107,4 @@ public class TestJobExecutionAsDifferent
     });
   }
 
-  /** Ensure that SIGQUIT can be properly sent by the LinuxTaskController
-   * if a task times out.
-   */
-  public void testTimeoutStackTrace() throws Exception {
-    if (!shouldRun()) {
-      return;
-    }
-
-    // Run a job that should timeout and trigger a SIGQUIT.
-    startCluster();
-    jobOwner.doAs(new PrivilegedExceptionAction<Object>() {
-      public Object run() throws Exception {
-        JobConf conf = getClusterConf();
-        conf.setInt(JobContext.TASK_TIMEOUT, 10000);
-        conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
-        SleepJob sleepJob = new SleepJob();
-        sleepJob.setConf(conf);
-        Job job = sleepJob.createJob(1, 0, 30000, 1, 0, 0);
-        job.setMaxMapAttempts(1);
-        int prevNumSigQuits = MyLinuxTaskController.attemptedSigQuits;
-        job.waitForCompletion(true);
-        assertTrue("Did not detect a new SIGQUIT!",
-            prevNumSigQuits < MyLinuxTaskController.attemptedSigQuits);
-        assertEquals("A SIGQUIT attempt failed!", 0,
-            MyLinuxTaskController.failedSigQuits);
-        return null;
-      }
-    });
-  }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java?rev=1079211&r1=1079210&r2=1079211&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobKillAndFail.java Tue Mar  8 05:56:27 2011
@@ -18,20 +18,12 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedReader;
-import java.io.FileInputStream;
 import java.io.File;
-import java.io.InputStreamReader;
 import java.io.IOException;
 
 import junit.framework.TestCase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.SleepJob;
 
 /**
  * A JUnit test to test Kill Job & Fail Job functionality with local file
@@ -39,96 +31,38 @@ import org.apache.hadoop.mapreduce.Sleep
  */
 public class TestJobKillAndFail extends TestCase {
 
-  static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
-
   private static String TEST_ROOT_DIR = new File(System.getProperty(
       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
 
-  /**
-   * TaskController instance that just sets a flag when a stack dump
-   * is performed in a child thread.
-   */
-  static class MockStackDumpTaskController extends DefaultTaskController {
-
-    static volatile int numStackDumps = 0;
-
-    static final Log LOG = LogFactory.getLog(TestJobKillAndFail.class);
-
-    public MockStackDumpTaskController() {
-      LOG.info("Instantiated MockStackDumpTC");
-    }
-
-    @Override
-    void dumpTaskStack(TaskControllerContext context) {
-      LOG.info("Got stack-dump request in TaskController");
-      MockStackDumpTaskController.numStackDumps++;
-      super.dumpTaskStack(context);
-    }
-
-  }
-
-  /** If a task was killed, then dumpTaskStack() should have been
-    * called. Test whether or not the counter was incremented
-    * and succeed/fail based on this. */
-  private void checkForStackDump(boolean expectDump, int lastNumDumps) {
-    int curNumDumps = MockStackDumpTaskController.numStackDumps;
-
-    LOG.info("curNumDumps=" + curNumDumps + "; lastNumDumps=" + lastNumDumps
-        + "; expect=" + expectDump);
-
-    if (expectDump) {
-      assertTrue("No stack dump recorded!", lastNumDumps < curNumDumps);
-    } else {
-      assertTrue("Stack dump happened anyway!", lastNumDumps == curNumDumps);
-    }
-  }
-
-  public void testJobFailAndKill() throws Exception {
+  public void testJobFailAndKill() throws IOException {
     MiniMRCluster mr = null;
     try {
       JobConf jtConf = new JobConf();
       jtConf.set("mapred.jobtracker.instrumentation", 
           JTInstrumentation.class.getName());
-      jtConf.set("mapreduce.tasktracker.taskcontroller",
-          MockStackDumpTaskController.class.getName());
       mr = new MiniMRCluster(2, "file:///", 3, null, null, jtConf);
       JTInstrumentation instr = (JTInstrumentation) 
         mr.getJobTrackerRunner().getJobTracker().getInstrumentation();
 
       // run the TCs
       JobConf conf = mr.createJobConf();
-      conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
       
       Path inDir = new Path(TEST_ROOT_DIR + "/failkilljob/input");
       Path outDir = new Path(TEST_ROOT_DIR + "/failkilljob/output");
-      RunningJob runningJob = UtilsForTests.runJobFail(conf, inDir, outDir);
+      RunningJob job = UtilsForTests.runJobFail(conf, inDir, outDir);
       // Checking that the Job got failed
-      assertEquals(runningJob.getJobState(), JobStatus.FAILED);
+      assertEquals(job.getJobState(), JobStatus.FAILED);
       assertTrue(instr.verifyJob());
       assertEquals(1, instr.failed);
       instr.reset();
 
-      int prevNumDumps = MockStackDumpTaskController.numStackDumps;
-      runningJob = UtilsForTests.runJobKill(conf, inDir, outDir);
+      
+      job = UtilsForTests.runJobKill(conf, inDir, outDir);
       // Checking that the Job got killed
-      assertTrue(runningJob.isComplete());
-      assertEquals(runningJob.getJobState(), JobStatus.KILLED);
+      assertTrue(job.isComplete());
+      assertEquals(job.getJobState(), JobStatus.KILLED);
       assertTrue(instr.verifyJob());
       assertEquals(1, instr.killed);
-      // check that job kill does not put a stacktrace in task logs.
-      checkForStackDump(false, prevNumDumps);
-
-      // Test that a task that times out does have a stack trace
-      conf = mr.createJobConf();
-      conf.setInt(JobContext.TASK_TIMEOUT, 10000);
-      conf.setInt(Job.COMPLETION_POLL_INTERVAL_KEY, 50);
-      SleepJob sleepJob = new SleepJob();
-      sleepJob.setConf(conf);
-      Job job = sleepJob.createJob(1, 0, 30000, 1,0, 0);
-      job.setMaxMapAttempts(1);
-      prevNumDumps = MockStackDumpTaskController.numStackDumps;
-      job.waitForCompletion(true);
-      checkForStackDump(true, prevNumDumps);
     } finally {
       if (mr != null) {
         mr.shutdown();