You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/07/11 10:16:29 UTC

svn commit: r420760 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/conf/ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Tue Jul 11 01:16:28 2006
New Revision: 420760

URL: http://svn.apache.org/viewvc?rev=420760&view=rev
Log:
HADOOP-313.  Permit task state to be saved so that single tasks may be manually re-executed when debugging.  Contributed by Owen.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Jul 11 01:16:28 2006
@@ -7,6 +7,9 @@
     /bin/bash, for better portability.
     (Jean-Baptiste Quenot via cutting)
 
+ 2. HADOOP-313.  Permit task state to be saved so that single tasks
+    may be manually re-executed when debugging.  (omalley via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/conf/Configuration.java Tue Jul 11 01:16:28 2006
@@ -494,6 +494,14 @@
     }
   }
 
+  /**
+   * Set the class loader that will be used to load the various objects.
+   * @param classLoader the new class loader
+   */
+  public void setClassLoader(ClassLoader classLoader) {
+    this.classLoader = classLoader;
+  }
+  
   public String toString() {
     StringBuffer sb = new StringBuffer();
     sb.append("Configuration: ");

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=420760&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Jul 11 01:16:28 2006
@@ -0,0 +1,162 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.SequenceFile;
+
+public class IsolationRunner {
+  private static final Log LOG = 
+    LogFactory.getLog(IsolationRunner.class.getName());
+
+  private static class FakeUmbilical implements TaskUmbilicalProtocol {
+
+    public void done(String taskid) throws IOException {
+      LOG.info("Task " + taskid + " reporting done.");
+    }
+
+    public void fsError(String message) throws IOException {
+      LOG.info("Task reporting file system error: " + message);
+    }
+
+    public Task getTask(String taskid) throws IOException {
+      return null;
+    }
+
+    public boolean ping(String taskid) throws IOException {
+      return true;
+    }
+
+    public void progress(String taskid, float progress, String state
+                         ) throws IOException {
+      StringBuffer buf = new StringBuffer("Task ");
+      buf.append(taskid);
+      buf.append(" making progress to ");
+      buf.append(progress);
+      if (state != null) {
+        buf.append(" and state of ");
+        buf.append(state);
+      }
+      LOG.info(buf.toString());
+    }
+
+    public void reportDiagnosticInfo(String taskid, String trace) throws IOException {
+      LOG.info("Task " + taskid + " has problem " + trace);
+    }
+    
+  }
+  
+  private static ClassLoader makeClassLoader(JobConf conf, 
+                                             File workDir) throws IOException {
+    List cp = new ArrayList();
+
+    String jar = conf.getJar();
+    if (jar != null) {                      // if jar exists, it into workDir
+      File[] libs = new File(workDir, "lib").listFiles();
+      if (libs != null) {
+        for (int i = 0; i < libs.length; i++) {
+          cp.add(new URL("file:" + libs[i].toString()));
+        }
+      }
+      cp.add(new URL("file:" + new File(workDir, "classes/").toString()));
+      cp.add(new URL("file:" + workDir.toString() + "/"));
+    }
+    return new URLClassLoader((URL[]) cp.toArray(new URL[cp.size()]));
+  }
+  
+  /**
+   * Create empty sequence files for any of the map outputs that we don't have.
+   * @param fs the filesystem to create the files in
+   * @param dir the directory name to create the files in
+   * @param conf the jobconf
+   * @throws IOException if something goes wrong writing
+   */
+  private static void fillInMissingMapOutputs(FileSystem fs, 
+                                              String taskId,
+                                              int numMaps,
+                                              JobConf conf) throws IOException {
+    Class keyClass = conf.getMapOutputKeyClass();
+    Class valueClass = conf.getMapOutputValueClass();
+    MapOutputFile namer = new MapOutputFile();
+    namer.setConf(conf);
+    for(int i=0; i<numMaps; i++) {
+      Path f = namer.getInputFile(i, taskId);
+      if(! fs.exists(f)) {
+        LOG.info("Create missing input: " + f);
+        SequenceFile.Writer out = new SequenceFile.Writer(fs, f, keyClass,
+                                                          valueClass);
+        out.close();
+      }
+    }    
+  }
+  
+  /**
+   * Run a single task
+   * @param args the first argument is the task directory
+   */
+  public static void main(String[] args) throws IOException {
+    if (args.length != 1) {
+      System.out.println("Usage: IsolationRunner <path>/job.xml");
+      System.exit(1);
+    }
+    File jobFilename = new File(args[0]);
+    if (!jobFilename.exists() || !jobFilename.isFile()) {
+      System.out.println(jobFilename + " is not a valid job file.");
+      System.exit(1);
+    }
+    JobConf conf = new JobConf(new Path(jobFilename.toString()));
+    String taskId = conf.get("mapred.task.id");
+    boolean isMap = conf.getBoolean("mapred.task.is.map", true);
+    String jobId = conf.get("mapred.job.id");
+    int partition = conf.getInt("mapred.task.partition", 0);
+    
+    // setup the local and user working directories
+    FileSystem local = FileSystem.getNamed("local", conf);
+    File workDirName = new File(jobFilename.getParent(), "work");
+    local.setWorkingDirectory(new Path(workDirName.toString()));
+    FileSystem.get(conf).setWorkingDirectory(conf.getWorkingDirectory());
+    
+    // set up a classloader with the right classpath
+    ClassLoader classLoader = makeClassLoader(conf, workDirName);
+    Thread.currentThread().setContextClassLoader(classLoader);
+    conf.setClassLoader(classLoader);
+    
+    Task task;
+    if (isMap) {
+      FileSplit split = new FileSplit(new Path(conf.get("map.input.file")),
+                                      conf.getLong("map.input.start", 0),
+                                      conf.getLong("map.input.length", 0));
+      task = new MapTask(jobId, jobFilename.toString(), taskId, partition, 
+                         split);
+    } else {
+      int numMaps = conf.getNumMapTasks();
+      fillInMissingMapOutputs(local, taskId, numMaps, conf);
+      task = new ReduceTask(jobId, jobFilename.toString(), taskId,
+                            partition, numMaps);
+    }
+    task.setConf(conf);
+    task.run(conf, new FakeUmbilical());
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Tue Jul 11 01:16:28 2006
@@ -194,6 +194,22 @@
   public void setUser(String user) {
     set("user.name", user);
   }
+
+  /**
+   * Set whether the framework shoul keep the intermediate files for 
+   * failed tasks.
+   */
+  public void setKeepFailedTaskFiles(boolean keep) {
+    setBoolean("keep.failed.task.files", keep);
+  }
+  
+  /**
+   * Should the temporary files for failed tasks be kept?
+   * @return should the files be kept?
+   */
+  public boolean getKeepFailedTaskFiles() {
+    return getBoolean("keep.failed.task.files", false);
+  }
   
   /**
    * Set the current working directory for the default file system

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jul 11 01:16:28 2006
@@ -78,13 +78,14 @@
         // split input into minimum number of splits
         FileSplit[] splits;
         splits = job.getInputFormat().getSplits(fs, job, 1);
-
+        String jobId = profile.getJobId();
         
         // run a map task for each split
         job.setNumReduceTasks(1);                 // force a single reduce task
         for (int i = 0; i < splits.length; i++) {
           mapIds.add("map_" + newId());
-          MapTask map = new MapTask(file, (String)mapIds.get(i), splits[i]);
+          MapTask map = new MapTask(jobId, file, (String)mapIds.get(i), i,
+                                    splits[i]);
           map.setConf(job);
           map_tasks += 1;
           map.run(job, this);
@@ -104,8 +105,8 @@
         }
 
         // run a single reduce task
-        ReduceTask reduce = new ReduceTask(profile.getJobId(), file, 
-                                           reduceId, mapIds.size(),0);
+        ReduceTask reduce = new ReduceTask(jobId, file, 
+                                           reduceId, 0, mapIds.size());
         reduce.setConf(job);
         reduce_tasks += 1;
         reduce.run(job, this);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Jul 11 01:16:28 2006
@@ -39,8 +39,9 @@
 
   public MapTask() {}
 
-  public MapTask(String jobFile, String taskId, FileSplit split) {
-    super(jobFile, taskId);
+  public MapTask(String jobId, String jobFile, String taskId, 
+                 int partition, FileSplit split) {
+    super(jobId, jobFile, taskId, partition);
     this.split = split;
   }
 
@@ -48,6 +49,13 @@
       return true;
   }
 
+  public void localizeConfiguration(JobConf conf) {
+    super.localizeConfiguration(conf);
+    conf.set("map.input.file", split.getPath().toString());
+    conf.setLong("map.input.start", split.getStart());
+    conf.setLong("map.input.length", split.getLength());
+  }
+  
   public TaskRunner createRunner(TaskTracker tracker) {
     return new MapTaskRunner(this, tracker, this.conf);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jul 11 01:16:28 2006
@@ -36,9 +36,7 @@
        });
   }
 
-  private UTF8 jobId = new UTF8();
   private int numMaps;
-  private int partition;
   private boolean sortComplete;
 
   { getProgress().setStatus("reduce"); }
@@ -52,11 +50,9 @@
   public ReduceTask() {}
 
   public ReduceTask(String jobId, String jobFile, String taskId,
-                    int numMaps, int partition) {
-    super(jobFile, taskId);
-    this.jobId.set(jobId);
+                    int partition, int numMaps) {
+    super(jobId, jobFile, taskId, partition);
     this.numMaps = numMaps;
-    this.partition = partition;
   }
 
   public TaskRunner createRunner(TaskTracker tracker) throws IOException {
@@ -67,31 +63,26 @@
       return false;
   }
 
+  public int getNumMaps() { return numMaps; }
+  
   /**
-   * Get the job name for this task.
-   * @return the job name
+   * Localize the given JobConf to be specific for this task.
    */
-  public UTF8 getJobId() {
-    return jobId;
+  public void localizeConfiguration(JobConf conf) {
+    super.localizeConfiguration(conf);
+    conf.setNumMapTasks(numMaps);
   }
-  
-  public int getNumMaps() { return numMaps; }
-  public int getPartition() { return partition; }
 
   public void write(DataOutput out) throws IOException {
     super.write(out);
 
-    jobId.write(out);
     out.writeInt(numMaps);                        // write the number of maps
-    out.writeInt(partition);                      // write partition
   }
 
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
 
-    jobId.readFields(in);
     numMaps = in.readInt();
-    this.partition = in.readInt();                // read partition
   }
 
   /** Iterates values while keys match in sorted input. */
@@ -215,7 +206,7 @@
       // sort the input file
       SequenceFile.Sorter sorter =
         new SequenceFile.Sorter(lfs, comparator, valueClass, job);
-      sorter.sort(mapFiles, sortedFile, true);              // sort
+      sorter.sort(mapFiles, sortedFile, !conf.getKeepFailedTaskFiles()); // sort
 
     } finally {
       sortComplete = true;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Jul 11 01:16:28 2006
@@ -35,15 +35,20 @@
 
   private String jobFile;                         // job configuration file
   private String taskId;                          // unique, includes job id
+  private String jobId;                           // unique jobid
+  private int partition;                          // id within job
+  
   ////////////////////////////////////////////
   // Constructors
   ////////////////////////////////////////////
 
   public Task() {}
 
-  public Task(String jobFile, String taskId) {
+  public Task(String jobId, String jobFile, String taskId, int partition) {
     this.jobFile = jobFile;
     this.taskId = taskId;
+    this.jobId = jobId;
+    this.partition = partition;
   }
 
   ////////////////////////////////////////////
@@ -52,6 +57,22 @@
   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
   public String getJobFile() { return jobFile; }
   public String getTaskId() { return taskId; }
+  
+  /**
+   * Get the job name for this task.
+   * @return the job name
+   */
+  public String getJobId() {
+    return jobId;
+  }
+  
+  /**
+   * Get the index of this task within the job.
+   * @return the integer part of the task id
+   */
+  public int getPartition() {
+    return partition;
+  }
 
   ////////////////////////////////////////////
   // Writable methods
@@ -60,14 +81,28 @@
   public void write(DataOutput out) throws IOException {
     UTF8.writeString(out, jobFile);
     UTF8.writeString(out, taskId);
+    UTF8.writeString(out, jobId);
+    out.writeInt(partition);
   }
   public void readFields(DataInput in) throws IOException {
     jobFile = UTF8.readString(in);
     taskId = UTF8.readString(in);
+    jobId = UTF8.readString(in);
+    partition = in.readInt();
   }
 
   public String toString() { return taskId; }
 
+  /**
+   * Localize the given JobConf to be specific for this task.
+   */
+  public void localizeConfiguration(JobConf conf) {
+    conf.set("mapred.task.id", taskId);
+    conf.setBoolean("mapred.task.is.map",isMapTask());
+    conf.setInt("mapred.task.partition", partition);
+    conf.set("mapred.job.id", jobId);
+  }
+  
   /** Run this task as a part of the named job.  This method is executed in the
    * child process and is what invokes user-supplied map, reduce, etc. methods.
    * @param umbilical for progress reports

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Jul 11 01:16:28 2006
@@ -413,12 +413,12 @@
 
             String taskid = (String) usableTaskIds.first();
             usableTaskIds.remove(taskid);
+            String jobId = job.getProfile().getJobId();
 
             if (isMapTask()) {
-                t = new MapTask(jobFile, taskid, split);
+                t = new MapTask(jobId, jobFile, taskid, partition, split);
             } else {
-                t = new ReduceTask(job.getProfile().getJobId(), jobFile, taskid, 
-                                   numMaps, partition);
+                t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps);
             }
             t.setConf(conf);
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=420760&r1=420759&r2=420760&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jul 11 01:16:28 2006
@@ -652,6 +652,7 @@
         boolean wasKilled = false;
         private JobConf defaultJobConf;
         private JobConf localJobConf;
+        private boolean keepFailedTaskFiles;
 
         /**
          */
@@ -682,7 +683,6 @@
             t.setJobFile(localJobFile.toString());
 
             localJobConf = new JobConf(localJobFile);
-            localJobConf.set("mapred.task.id", task.getTaskId());
             localJobConf.set("mapred.local.dir",
                     this.defaultJobConf.get("mapred.local.dir"));
             String jarFile = localJobConf.getJar();
@@ -690,6 +690,7 @@
               fs.copyToLocalFile(new Path(jarFile), localJarFile);
               localJobConf.setJar(localJarFile.toString());
             }
+            task.localizeConfiguration(localJobConf);
 
             FileSystem localFs = FileSystem.getNamed("local", fConf);
             OutputStream out = localFs.create(localJobFile);
@@ -701,6 +702,7 @@
             // set the task's configuration to the local job conf
             // rather than the default.
             t.setConf(localJobConf);
+            keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
         }
 
         /**
@@ -875,6 +877,9 @@
             LOG.debug("Cleaning up " + taskId);
             synchronized (TaskTracker.this) {
                tasks.remove(taskId);
+               if (runstate == TaskStatus.FAILED && keepFailedTaskFiles) {
+                 return;
+               }
                synchronized (this) {
                  try {
                     runner.close();