You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/09/19 09:31:42 UTC

svn commit: r696957 [1/2] - in /hadoop/core/trunk: ./ conf/ src/core/org/apache/hadoop/filecache/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: acmurthy
Date: Fri Sep 19 00:31:41 2008
New Revision: 696957

URL: http://svn.apache.org/viewvc?rev=696957&view=rev
Log:
HADOOP-249. Reuse JVMs across Map-Reduce Tasks. Contributed by Devaraj Das.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml
    hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRDFSSort.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Sep 19 00:31:41 2008
@@ -180,6 +180,11 @@
 
     HADOOP-4176. Implement getFileChecksum(Path) in HftpFileSystem. (szetszwo)
 
+    HADOOP-249. Reuse JVMs across Map-Reduce Tasks. 
+    Configuration changes to hadoop-default.xml:
+      add mapred.job.reuse.jvm.num.tasks
+    (Devaraj Das via acmurthy) 
+
   IMPROVEMENTS
 
     HADOOP-4106. libhdfs: add time, permission and user attribute support (part 2).

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Sep 19 00:31:41 2008
@@ -957,6 +957,14 @@
 </property>
 
 <property>
+  <name>mapred.job.reuse.jvm.num.tasks</name>
+  <value>1</value>
+  <description>How many tasks to run per jvm. If set to -1, there is
+  no limit. 
+  </description>
+</property>
+
+<property>
   <name>mapred.min.split.size</name>
   <value>0</value>
   <description>The minimum size chunk that map input should be split

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/filecache/DistributedCache.java Fri Sep 19 00:31:41 2008
@@ -152,6 +152,42 @@
                                    boolean isArchive, long confFileStamp,
                                    Path currentWorkDir) 
   throws IOException {
+    return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
+        confFileStamp, currentWorkDir, true);
+  }
+  /**
+   * Get the locally cached file or archive; it could either be 
+   * previously cached (and valid) or copy it from the {@link FileSystem} now.
+   * 
+   * @param cache the cache to be localized, this should be specified as 
+   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
+   * or hostname:port is provided the file is assumed to be in the filesystem
+   * being used in the Configuration
+   * @param conf The Confguration file which contains the filesystem
+   * @param baseDir The base cache Dir where you wnat to localize the files/archives
+   * @param fileStatus The file status on the dfs.
+   * @param isArchive if the cache is an archive or a file. In case it is an
+   *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
+   *  be unzipped/unjarred/untarred automatically 
+   *  and the directory where the archive is unzipped/unjarred/untarred is
+   *  returned as the Path.
+   *  In case of a file, the path to the file is returned
+   * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
+   * file to be cached hasn't changed since the job started
+   * @param currentWorkDir this is the directory where you would want to create symlinks 
+   * for the locally cached files/archives
+   * @param honorSymLinkConf if this is false, then the symlinks are not
+   * created even if conf says so (this is required for an optimization in task
+   * launches
+   * @return the path to directory where the archives are unjarred in case of archives,
+   * the path to the file where the file is copied locally 
+   * @throws IOException
+   */
+  public static Path getLocalCache(URI cache, Configuration conf, 
+      Path baseDir, FileStatus fileStatus,
+      boolean isArchive, long confFileStamp,
+      Path currentWorkDir, boolean honorSymLinkConf) 
+  throws IOException {
     String cacheId = makeRelative(cache, conf);
     CacheStatus lcacheStatus;
     Path localizedPath;
@@ -162,10 +198,10 @@
         lcacheStatus = new CacheStatus(new Path(baseDir, new Path(cacheId)));
         cachedArchives.put(cacheId, lcacheStatus);
       }
-      
+
       synchronized (lcacheStatus) {
         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
-                                      fileStatus, isArchive, currentWorkDir);
+            fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
         lcacheStatus.refcount++;
       }
     }
@@ -180,6 +216,7 @@
     }
     return localizedPath;
   }
+
   
   /**
    * Get the locally cached file or archive; it could either be 
@@ -292,9 +329,9 @@
                                     CacheStatus cacheStatus,
                                     FileStatus fileStatus,
                                     boolean isArchive, 
-                                    Path currentWorkDir) 
+                                    Path currentWorkDir,boolean honorSymLinkConf) 
   throws IOException {
-    boolean doSymlink = getSymlink(conf);
+    boolean doSymlink = honorSymLinkConf && getSymlink(conf);
     if(cache.getFragment() == null) {
     	doSymlink = false;
     }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java?rev=696957&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java Fri Sep 19 00:31:41 2008
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSError;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+import org.apache.log4j.LogManager;
+
+/** 
+ * The main() for child processes. 
+ */
+
+public class Child {
+
+  public static final Log LOG =
+    LogFactory.getLog(TaskTracker.class);
+
+  static volatile TaskAttemptID taskid;
+
+  public static void main(String[] args) throws Throwable {
+    LOG.debug("Child starting");
+
+    JobConf defaultConf = new JobConf();
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    InetSocketAddress address = new InetSocketAddress(host, port);
+    final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
+    taskid = firstTaskid;
+    int jvmIdInt = Integer.parseInt(args[3]);
+    JVMId jvmId = new JVMId(taskid.getJobID(),taskid.isMap(),jvmIdInt);
+    final int MAX_SLEEP_COUNT = 600; //max idle time of 5 minutes
+    int sleepCount = 0;
+    TaskUmbilicalProtocol umbilical =
+      (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
+          TaskUmbilicalProtocol.versionID,
+          address,
+          defaultConf);
+    int numTasksToExecute = -1; //-1 signifies "no limit"
+    int numTasksExecuted = 0;
+    Thread t = new Thread() {
+      public void run() {
+        //every so often wake up and syncLogs so that we can track
+        //logs of the currently running task
+        while (true) {
+          try {
+            Thread.sleep(5000);
+            TaskLog.syncLogs(firstTaskid, taskid);
+          } catch (InterruptedException ie) {
+          } catch (IOException iee) {
+            LOG.error("Error in syncLogs: " + iee);
+            System.exit(-1);
+          }
+        }
+      }
+    };
+    t.setName("Thread for syncLogs");
+    t.setDaemon(true);
+    t.start();
+    //for the memory management, a PID file is written and the PID file
+    //is written once per JVM. We simply symlink the file on a per task
+    //basis later (see below). Long term, we should change the Memory
+    //manager to use JVMId instead of TaskAttemptId
+    Path srcPidPath = null;
+    Path dstPidPath = null;
+    try {
+      while (true) {
+        JvmTask myTask = umbilical.getTask(jvmId, firstTaskid);
+        if (myTask.shouldDie()) {
+          break;
+        } else {
+          if (myTask.getTask() == null) {
+            if (sleepCount == MAX_SLEEP_COUNT) {
+              System.exit(0);
+            }
+            sleepCount++;
+            Thread.sleep(500);
+            continue;
+          }
+          sleepCount = 0; //got a task. reset the sleepCount
+        }
+        Task task = myTask.getTask();
+        taskid = task.getTaskID();
+        
+        //create the index file so that the log files 
+        //are viewable immediately
+        TaskLog.syncLogs(firstTaskid, taskid);
+        JobConf job = new JobConf(task.getJobFile());
+        if (job.getBoolean("task.memory.mgmt.enabled", false)) {
+          if (srcPidPath == null) {
+            srcPidPath = TaskMemoryManagerThread.getPidFilePath(firstTaskid,
+                                                              job);
+          }
+          //since the JVM is running multiple tasks potentially, we need
+          //to do symlink stuff only for the subsequent tasks
+          if (!taskid.equals(firstTaskid)) {
+            dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
+            FileUtil.symLink(srcPidPath.toUri().getPath(), 
+                dstPidPath.toUri().getPath());
+          }
+        }
+        //setupWorkDir actually sets up the symlinks for the distributed
+        //cache. After a task exits we wipe the workdir clean, and hence
+        //the symlinks have to be rebuilt.
+        TaskRunner.setupWorkDir(job);
+
+        numTasksToExecute = job.getNumTasksToExecutePerJvm();
+        assert(numTasksToExecute != 0);
+        TaskLog.cleanup(job.getInt("mapred.userlog.retain.hours", 24));
+
+        task.setConf(job);
+
+        defaultConf.addResource(new Path(task.getJobFile()));
+
+        // Initiate Java VM metrics
+        JvmMetrics.init(task.getPhase().toString(), job.getSessionId());
+        // use job-specified working directory
+        FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
+        try {
+          task.run(job, umbilical);             // run the task
+        } finally {
+          TaskLog.syncLogs(firstTaskid, taskid);
+          if (!taskid.equals(firstTaskid) && 
+              job.getBoolean("task.memory.mgmt.enabled", false)) {
+            new File(dstPidPath.toUri().getPath()).delete();
+          }
+        }
+        if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
+          break;
+        }
+      }
+    } catch (FSError e) {
+      LOG.fatal("FSError from child", e);
+      umbilical.fsError(taskid, e.getMessage());
+    } catch (Throwable throwable) {
+      LOG.warn("Error running child", throwable);
+      // Report back any failures, for diagnostic purposes
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      throwable.printStackTrace(new PrintStream(baos));
+      umbilical.reportDiagnosticInfo(taskid, baos.toString());
+    } finally {
+      RPC.stopProxy(umbilical);
+      MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+      metricsContext.close();
+      // Shutting down log4j of the child-vm... 
+      // This assumes that on return from Task.run() 
+      // there is no more logging done.
+      LogManager.shutdown();
+    }
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Sep 19 00:31:41 2008
@@ -47,8 +47,9 @@
    * Version 15: Changed format of Task and TaskStatus for HADOOP-153
    * Version 16: adds ResourceStatus to TaskTrackerStatus for HADOOP-3759
    * Version 17: Changed format of Task and TaskStatus for HADOOP-3150
+   * Version 18: Changed status message due to changes in TaskStatus
    */
-  public static final long versionID = 17L;
+  public static final long versionID = 18L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Sep 19 00:31:41 2008
@@ -35,6 +35,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JvmTask;
 
 public class IsolationRunner {
   private static final Log LOG = 
@@ -58,7 +59,7 @@
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
-    public Task getTask(TaskAttemptID taskid) throws IOException {
+    public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) throws IOException {
       return null;
     }
 

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java?rev=696957&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JVMId.java Fri Sep 19 00:31:41 2008
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+public class JVMId extends ID {
+  boolean isMap;
+  JobID jobId;
+  private static final String JVM = "jvm";
+  private static char UNDERSCORE = '_';  
+  private static NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+  
+  public JVMId(JobID jobId, boolean isMap, int id) {
+    super(id);
+    this.isMap = isMap;
+    this.jobId = jobId;
+  }
+  
+  public JVMId (String jtIdentifier, int jobId, boolean isMap, int id) {
+    this(new JobID(jtIdentifier, jobId), isMap, id);
+  }
+    
+  private JVMId() { }
+  
+  public boolean isMapJVM() {
+    return isMap;
+  }
+  public JobID getJobId() {
+    return jobId;
+  }
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(JVMId.class)) {
+      JVMId that = (JVMId)o;
+      return this.id==that.id
+        && this.isMap == that.isMap
+        && this.jobId.equals(that.jobId);
+    }
+    else return false;
+  }
+
+  /**Compare TaskInProgressIds by first jobIds, then by tip numbers. Reduces are 
+   * defined as greater then maps.*/
+  @Override
+  public int compareTo(ID o) {
+    JVMId that = (JVMId)o;
+    int jobComp = this.jobId.compareTo(that.jobId);
+    if(jobComp == 0) {
+      if(this.isMap == that.isMap) {
+        return this.id - that.id;
+      }
+      else return this.isMap ? -1 : 1;
+    }
+    else return jobComp;
+  }
+  
+  @Override
+  public String toString() { 
+    StringBuilder builder = new StringBuilder();
+    return builder.append(JVM).append(UNDERSCORE)
+      .append(toStringWOPrefix()).toString();
+  }
+
+  StringBuilder toStringWOPrefix() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(jobId.toStringWOPrefix())
+      .append(isMap ? "_m_" : "_r_");
+    return builder.append(idFormat.format(id));
+  }
+  
+  @Override
+  public int hashCode() {
+    return toStringWOPrefix().toString().hashCode();
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.jobId = JobID.read(in);
+    this.isMap = in.readBoolean();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    jobId.write(out);
+    out.writeBoolean(isMap);
+  }
+  
+  public static JVMId read(DataInput in) throws IOException {
+    JVMId jvmId = new JVMId();
+    jvmId.readFields(in);
+    return jvmId;
+  }
+  
+  /** Construct a JVMId object from given string 
+   * @return constructed JVMId object or null if the given String is null
+   * @throws IllegalArgumentException if the given string is malformed
+   */
+  public static JVMId forName(String str) 
+    throws IllegalArgumentException {
+    if(str == null)
+      return null;
+    try {
+      String[] parts = str.split("_");
+      if(parts.length == 5) {
+        if(parts[0].equals(JVM)) {
+          boolean isMap = false;
+          if(parts[3].equals("m")) isMap = true;
+          else if(parts[3].equals("r")) isMap = false;
+          else throw new Exception();
+          return new JVMId(parts[1], Integer.parseInt(parts[2]),
+              isMap, Integer.parseInt(parts[4]));
+        }
+      }
+    }catch (Exception ex) {//fall below
+    }
+    throw new IllegalArgumentException("TaskId string : " + str 
+        + " is not properly formed");
+  }
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Sep 19 00:31:41 2008
@@ -326,6 +326,23 @@
   }
   
   /**
+   * Sets the number of tasks that a spawned task JVM should run
+   * before it exits
+   * @param numTasks the number of tasks to execute; defaults to 1;
+   * -1 signifies no limit
+   */
+  public void setNumTasksToExecutePerJvm(int numTasks) {
+    setInt("mapred.job.reuse.jvm.num.tasks", numTasks);
+  }
+  
+  /**
+   * Get the number of tasks that a spawned JVM should execute
+   */
+  public int getNumTasksToExecutePerJvm() {
+    return getInt("mapred.job.reuse.jvm.num.tasks", 1);
+  }
+  
+  /**
    * Get the {@link InputFormat} implementation for the map-reduce job,
    * defaults to {@link TextInputFormat} if not specified explicity.
    * 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Sep 19 00:31:41 2008
@@ -1415,7 +1415,9 @@
     for (TaskInProgress tip : job.getMapTasks()) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
-            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+            taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
         }
@@ -1424,7 +1426,9 @@
     for (TaskInProgress tip : job.getReduceTasks()) {
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
-            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
+            taskStatus.getRunState() != TaskStatus.State.INITIALIZED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
         }

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=696957&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Fri Sep 19 00:31:41 2008
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+class JvmManager {
+
+  public static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.mapred.JvmManager");
+
+  JvmManagerForType mapJvmManager;
+
+  JvmManagerForType reduceJvmManager;
+  
+  TaskTracker tracker;
+
+  public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
+      File stdout,File stderr,long logSize, File workDir, 
+      Map<String,String> env, String pidFile, JobConf conf) {
+    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,pidFile,conf);
+  }
+  
+  public JvmManager(TaskTracker tracker) {
+    mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
+        true, tracker);
+    reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
+        false, tracker);
+    this.tracker = tracker;
+  }
+  
+  public void stop() {
+    mapJvmManager.stop();
+    reduceJvmManager.stop();
+  }
+
+  public boolean isJvmKnown(JVMId jvmId) {
+    if (jvmId.isMapJVM()) {
+      return mapJvmManager.isJvmknown(jvmId);
+    } else {
+      return reduceJvmManager.isJvmknown(jvmId);
+    }
+  }
+
+  public void launchJvm(JobID jobId, boolean isMap, JvmEnv env) {
+    if (isMap) {
+      mapJvmManager.reapJvm(env, jobId, tracker);
+    } else {
+      reduceJvmManager.reapJvm(env, jobId, tracker);
+    }
+  }
+
+  public void setRunningTaskForJvm(JVMId jvmId, TaskRunner t) {
+    if (jvmId.isMapJVM()) {
+      mapJvmManager.setRunningTaskForJvm(jvmId, t);
+    } else {
+      reduceJvmManager.setRunningTaskForJvm(jvmId, t);
+    }
+  }
+
+  public void taskFinished(TaskRunner tr) {
+    if (tr.getTask().isMapTask()) {
+      mapJvmManager.taskFinished(tr);
+    } else {
+      reduceJvmManager.taskFinished(tr);
+    }
+  }
+
+  public void taskKilled(TaskRunner tr) {
+    if (tr.getTask().isMapTask()) {
+      mapJvmManager.taskKilled(tr);
+    } else {
+      reduceJvmManager.taskKilled(tr);
+    }
+  }
+
+  public void killJvm(JVMId jvmId) {
+    if (jvmId.isMap) {
+      mapJvmManager.killJvm(jvmId);
+    } else {
+      reduceJvmManager.killJvm(jvmId);
+    }
+  }  
+
+  private static class JvmManagerForType {
+    //Mapping from the JVM IDs to running Tasks
+    Map <JVMId,TaskRunner> jvmToRunningTask = 
+      new HashMap<JVMId, TaskRunner>();
+    //Mapping from the tasks to JVM IDs
+    Map <TaskRunner,JVMId> runningTaskToJvm = 
+      new HashMap<TaskRunner, JVMId>();
+    //Mapping from the JVM IDs to Reduce JVM processes
+    Map <JVMId, JvmRunner> jvmIdToRunner = 
+      new HashMap<JVMId, JvmRunner>();
+    int maxJvms;
+    boolean isMap;
+    
+    Random rand = new Random(System.currentTimeMillis());
+    TaskTracker tracker;
+
+    public JvmManagerForType(int maxJvms, boolean isMap, TaskTracker tracker) {
+      this.maxJvms = maxJvms;
+      this.isMap = isMap;
+      this.tracker = tracker;
+    }
+
+    synchronized public void setRunningTaskForJvm(JVMId jvmId, 
+        TaskRunner t) {
+      if (t == null) { 
+        //signifies the JVM asked for a task and it 
+        //was not given anything.
+        jvmIdToRunner.get(jvmId).setBusy(false);
+        return;
+      }
+      jvmToRunningTask.put(jvmId, t);
+      runningTaskToJvm.put(t,jvmId);
+      jvmIdToRunner.get(jvmId).setBusy(true);
+    }
+    
+    synchronized public boolean isJvmknown(JVMId jvmId) {
+      return jvmIdToRunner.containsKey(jvmId);
+    }
+
+    synchronized public void taskFinished(TaskRunner tr) {
+      JVMId jvmId = runningTaskToJvm.remove(tr);
+      if (jvmId != null) {
+        jvmToRunningTask.remove(jvmId);
+        JvmRunner jvmRunner;
+        if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
+          jvmRunner.taskRan();
+        }
+      }
+    }
+
+    synchronized public void taskKilled(TaskRunner tr) {
+      JVMId jvmId = runningTaskToJvm.remove(tr);
+      if (jvmId != null) {
+        jvmToRunningTask.remove(jvmId);
+        killJvm(jvmId);
+      }
+    }
+
+    synchronized public void killJvm(JVMId jvmId) {
+      JvmRunner jvmRunner;
+      if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
+        jvmRunner.kill();
+      }
+    }
+    
+    synchronized public void stop() {
+      for (JvmRunner jvm : jvmIdToRunner.values()) {
+        jvm.kill();
+      }
+    }
+    
+    synchronized private void removeJvm(JVMId jvmId) {
+      jvmIdToRunner.remove(jvmId);
+    }
+    private synchronized void reapJvm( 
+        JvmEnv env,
+        JobID jobId, TaskTracker tracker) {
+      boolean spawnNewJvm = false;
+      //Check whether there is a free slot to start a new JVM.
+      //,or, Kill a (idle) JVM and launch a new one
+      int numJvmsSpawned = jvmIdToRunner.size();
+
+      if (numJvmsSpawned >= maxJvms) {
+        //go through the list of JVMs for all jobs.
+        //for each JVM see whether it is currently running something and
+        //if not, then kill the JVM
+        Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter = 
+          jvmIdToRunner.entrySet().iterator();
+        
+        while (jvmIter.hasNext()) {
+          JvmRunner jvmRunner = jvmIter.next().getValue();
+          JobID jId = jvmRunner.jvmId.getJobId();
+          //Cases when a JVM is killed: 
+          // (1) the JVM under consideration belongs to the same job 
+          //     (passed in the argument). In this case, kill only when
+          //     the JVM ran all the tasks it was scheduled to run (in terms
+          //     of count).
+          // (2) the JVM under consideration belongs to a different job and is
+          //     currently not busy
+          //             
+          if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
+              (!jId.equals(jobId) && !jvmRunner.isBusy())) {
+            jvmIter.remove();
+            jvmRunner.kill();
+            spawnNewJvm = true;
+            break;
+          }
+        }
+      } else {
+        spawnNewJvm = true;
+      }
+
+      if (spawnNewJvm) {
+        spawnNewJvm(jobId, env, tracker);
+      } else {
+        LOG.info("No new JVM spawned for jobId: " + jobId);
+      }
+    }
+
+    private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker) {
+      JvmRunner jvmRunner = new JvmRunner(env,jobId);
+      jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
+      //spawn the JVM in a new thread. Note that there will be very little
+      //extra overhead of launching the new thread for a new JVM since
+      //most of the cost is involved in launching the process. Moreover,
+      //since we are going to be using the JVM for running many tasks,
+      //the thread launch cost becomes trivial when amortized over all
+      //tasks. Doing it this way also keeps code simple.
+      jvmRunner.setDaemon(true);
+      jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
+      if (tracker.isTaskMemoryManagerEnabled()) {
+        tracker.getTaskMemoryManager().addTask(
+            TaskAttemptID.forName(env.conf.get("mapred.task.id")),
+            tracker.getMemoryForTask(env.conf));
+      }
+      LOG.info(jvmRunner.getName());
+      jvmRunner.start();
+    }
+    synchronized private void updateOnJvmExit(JVMId jvmId, 
+        int exitCode, boolean killed) {
+      removeJvm(jvmId);
+      TaskRunner t = jvmToRunningTask.remove(jvmId);
+
+      if (t != null) {
+        runningTaskToJvm.remove(t);
+        if (!killed && exitCode != 0) {
+          t.setExitCode(exitCode);
+        }
+        t.signalDone();
+      }
+    }
+
+    private class JvmRunner extends Thread {
+      JvmEnv env;
+      volatile boolean killed = false;
+      volatile int numTasksRan;
+      final int numTasksToRun;
+      JVMId jvmId;
+      volatile boolean busy = true;
+      private ShellCommandExecutor shexec; // shell terminal for running the task
+      public JvmRunner(JvmEnv env, JobID jobId) {
+        this.env = env;
+        this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
+        this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
+        LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
+      }
+      public void run() {
+        runChild(env);
+      }
+
+      public void runChild(JvmEnv env) {
+        try {
+          env.vargs.add(Integer.toString(jvmId.getId()));
+          List<String> wrappedCommand = 
+            TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
+                env.logSize, env.pidFile);
+          shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
+              env.workDir, env.env);
+          shexec.execute();
+        } catch (IOException ioe) {
+          // do nothing
+          // error and output are appropriately redirected
+        } finally { // handle the exit code
+          if (shexec == null) {
+            return;
+          }
+          int exitCode = shexec.getExitCode();
+          updateOnJvmExit(jvmId, exitCode, killed);
+          LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
+              numTasksRan);
+          try {
+            //the task jvm cleans up the common workdir for every 
+            //task at the beginning of each task in the task JVM.
+            //For the last task, we do it here.
+            if (env.conf.getNumTasksToExecutePerJvm() != 1) {
+              FileUtil.fullyDelete(env.workDir);
+            }
+          } catch (IOException ie){}
+          if (tracker.isTaskMemoryManagerEnabled()) {
+          // Remove the associated pid-file, if any
+            tracker.getTaskMemoryManager().
+               removePidFile(TaskAttemptID.forName(
+                   env.conf.get("mapred.task.id")));
+          }
+        }
+      }
+
+      public void kill() {
+        if (shexec != null) {
+          Process process = shexec.getProcess();
+          if (process != null) {
+            process.destroy();
+          }
+        }
+        removeJvm(jvmId);
+      }
+      
+      public void taskRan() {
+        busy = false;
+        numTasksRan++;
+      }
+      
+      public boolean ranAll() {
+        return(numTasksRan == numTasksToRun);
+      }
+      public void setBusy(boolean busy) {
+        this.busy = busy;
+      }
+      public boolean isBusy() {
+        return busy;
+      }
+    }
+  }  
+  static class JvmEnv { //Helper class
+    List<String> vargs;
+    List<String> setup;
+    File stdout;
+    File stderr;
+    File workDir;
+    String pidFile;
+    long logSize;
+    JobConf conf;
+    Map<String, String> env;
+
+    public JvmEnv(List<String> setup, Vector<String> vargs, File stdout, 
+        File stderr, long logSize, File workDir, Map<String,String> env,
+        String pidFile, JobConf conf) {
+      this.setup = setup;
+      this.vargs = vargs;
+      this.stdout = stdout;
+      this.stderr = stderr;
+      this.workDir = workDir;
+      this.env = env;
+      this.pidFile = pidFile;
+      this.conf = conf;
+    }
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java?rev=696957&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmTask.java Fri Sep 19 00:31:41 2008
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+
+class JvmTask implements Writable {
+  Task t;
+  boolean shouldDie;
+  public JvmTask(Task t, boolean shouldDie) {
+    this.t = t;
+    this.shouldDie = shouldDie;
+  }
+  public JvmTask() {}
+  public Task getTask() {
+    return t;
+  }
+  public boolean shouldDie() {
+    return shouldDie;
+  }
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(shouldDie);
+    if (t != null) {
+      out.writeBoolean(true);
+      out.writeBoolean(t.isMapTask());
+      t.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+  public void readFields(DataInput in) throws IOException {
+    shouldDie = in.readBoolean();
+    boolean taskComing = in.readBoolean();
+    if (taskComing) {
+      boolean isMap = in.readBoolean();
+      if (isMap) {
+        t = new MapTask();
+      } else {
+        t = new ReduceTask();
+      }
+      t.readFields(in);
+    }
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Sep 19 00:31:41 2008
@@ -29,6 +29,7 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.JobTrackerMetricsInst;
+import org.apache.hadoop.mapred.JvmTask;
 
 /** Implements MapReduce locally, in-process, for debugging. */ 
 class LocalJobRunner implements JobSubmissionProtocol {
@@ -206,7 +207,7 @@
 
     // TaskUmbilicalProtocol methods
 
-    public Task getTask(TaskAttemptID taskid) { return null; }
+    public JvmTask getTask(JVMId jvmId, TaskAttemptID taskId) { return null; }
 
     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Sep 19 00:31:41 2008
@@ -108,6 +108,7 @@
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
   protected boolean cleanupJob = false;
+  private Thread pingProgressThread;
   
   //skip ranges based on failed ranges from previous attempts
   private SortedRanges skipRanges = new SortedRanges();
@@ -344,7 +345,7 @@
    * let the parent know that it's alive. It also pings the parent to see if it's alive. 
    */
   protected void startCommunicationThread(final TaskUmbilicalProtocol umbilical) {
-    Thread thread = new Thread(new Runnable() {
+    pingProgressThread = new Thread(new Runnable() {
         public void run() {
           final int MAX_RETRIES = 3;
           int remainingRetries = MAX_RETRIES;
@@ -407,8 +408,8 @@
           }
         }
       }, "Comm thread for "+taskId);
-    thread.setDaemon(true);
-    thread.start();
+    pingProgressThread.setDaemon(true);
+    pingProgressThread.start();
     LOG.debug(getTaskID() + " Progress/ping thread started");
   }
 
@@ -596,6 +597,10 @@
       commit(umbilical, outputCommitter);
     }
     taskDone.set(true);
+    pingProgressThread.interrupt();
+    try {
+      pingProgressThread.join();
+    } catch (InterruptedException ie) {}
     sendLastUpdate(umbilical);
     //signal the tasktracker that we are done
     sendDone(umbilical);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Sep 19 00:31:41 2008
@@ -485,7 +485,9 @@
       // and is addressed better at the TaskTracker to ensure this.
       // @see {@link TaskTracker.transmitHeartbeat()}
       if ((newState != TaskStatus.State.RUNNING && 
-           newState != TaskStatus.State.COMMIT_PENDING ) && 
+           newState != TaskStatus.State.COMMIT_PENDING && 
+           newState != TaskStatus.State.INITIALIZED &&
+           newState != TaskStatus.State.UNASSIGNED) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
                  "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
@@ -496,7 +498,9 @@
       // We have seen out of order status messagesmoving tasks from complete
       // to running. This is a spot fix, but it should be addressed more
       // globally.
-      if (newState == TaskStatus.State.RUNNING &&
+      if ((newState == TaskStatus.State.RUNNING || 
+          newState == TaskStatus.State.UNASSIGNED ||
+          newState == TaskStatus.State.INITIALIZED) &&
           (oldState == TaskStatus.State.FAILED || 
            oldState == TaskStatus.State.KILLED || 
            oldState == TaskStatus.State.SUCCEEDED ||
@@ -708,7 +712,9 @@
   boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
     TaskStatus st = taskStatuses.get(taskId);
     if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
-        || st.getRunState() == TaskStatus.State.COMMIT_PENDING)
+        || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+        st.getRunState() == TaskStatus.State.INITIALIZED ||
+        st.getRunState() == TaskStatus.State.UNASSIGNED)
         && tasksToKill.put(taskId, shouldFail) == null ) {
       String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
                       + " task '" + taskId + "' by user";

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Sep 19 00:31:41 2008
@@ -18,17 +18,25 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.log4j.Appender;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 /**
  * A simple logger to handle the task-specific user logs.
@@ -50,8 +58,130 @@
   }
 
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
-    return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+    return new File(getBaseDir(taskid.toString()), filter.toString());
   }
+  public static File getRealTaskLogFileLocation(TaskAttemptID taskid, 
+      LogName filter) {
+    LogFileDetail l;
+    try {
+      l = getTaskLogFileDetail(taskid, filter);
+    } catch (IOException ie) {
+      LOG.error("getTaskLogFileDetail threw an exception " + ie);
+      return null;
+    }
+    return new File(getBaseDir(l.location), filter.toString());
+  }
+  private static class LogFileDetail {
+    final static String LOCATION = "LOG_DIR:";
+    String location;
+    long start;
+    long length;
+  }
+  
+  private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
+      LogName filter) throws IOException {
+    File indexFile = new File(getBaseDir(taskid.toString()), "log.index");
+    BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
+    //the format of the index file is
+    //LOG_DIR: <the dir where the task logs are really stored>
+    //stdout:<start-offset in the stdout file> <length>
+    //stderr:<start-offset in the stderr file> <length>
+    //syslog:<start-offset in the syslog file> <length>
+    LogFileDetail l = new LogFileDetail();
+    String str = fis.readLine();
+    if (str == null) { //the file doesn't have anything
+      throw new IOException ("Index file for the log of " + taskid+" doesn't exist.");
+    }
+    l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+
+        LogFileDetail.LOCATION.length());
+    //special cases are the debugout and profile.out files. They are guaranteed
+    //to be associated with each task attempt since jvm reuse is disabled
+    //when profiling/debugging is enabled
+    if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
+      l.length = new File(getBaseDir(l.location), filter.toString()).length();
+      l.start = 0;
+      fis.close();
+      return l;
+    }
+    str = fis.readLine();
+    while (str != null) {
+      //look for the exact line containing the logname
+      if (str.contains(filter.toString())) {
+        str = str.substring(filter.toString().length()+1);
+        String[] startAndLen = str.split(" ");
+        l.start = Long.parseLong(startAndLen[0]);
+        l.length = Long.parseLong(startAndLen[1]);
+        break;
+      }
+      str = fis.readLine();
+    }
+    fis.close();
+    return l;
+  }
+  
+  public static File getIndexFile(String taskid) {
+    return new File(getBaseDir(taskid), "log.index");
+  }
+  private static File getBaseDir(String taskid) {
+    return new File(LOG_DIR, taskid);
+  }
+  private static long prevOutLength;
+  private static long prevErrLength;
+  private static long prevLogLength;
+  
+  private static void writeToIndexFile(TaskAttemptID firstTaskid) 
+  throws IOException {
+    File indexFile = getIndexFile(currentTaskid.toString());
+    BufferedOutputStream bos = 
+      new BufferedOutputStream(new FileOutputStream(indexFile,false));
+    DataOutputStream dos = new DataOutputStream(bos);
+    //the format of the index file is
+    //LOG_DIR: <the dir where the task logs are really stored>
+    //STDOUT: <start-offset in the stdout file> <length>
+    //STDERR: <start-offset in the stderr file> <length>
+    //SYSLOG: <start-offset in the syslog file> <length>    
+    dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"\n"+
+        LogName.STDOUT.toString()+":");
+    dos.writeBytes(Long.toString(prevOutLength)+" ");
+    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT)
+        .length() - prevOutLength)+"\n"+LogName.STDERR+":");
+    dos.writeBytes(Long.toString(prevErrLength)+" ");
+    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR)
+        .length() - prevErrLength)+"\n"+LogName.SYSLOG.toString()+":");
+    dos.writeBytes(Long.toString(prevLogLength)+" ");
+    dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
+        .length() - prevLogLength)+"\n");
+    dos.close();
+  }
+  private static void resetPrevLengths(TaskAttemptID firstTaskid) {
+    prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
+    prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length();
+    prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
+  }
+  private volatile static TaskAttemptID currentTaskid = null;
+  @SuppressWarnings("unchecked")
+  public synchronized static void syncLogs(TaskAttemptID firstTaskid, TaskAttemptID taskid) 
+  throws IOException {
+    System.out.flush();
+    System.err.flush();
+    Enumeration<Logger> allLoggers = LogManager.getCurrentLoggers();
+    while (allLoggers.hasMoreElements()) {
+      Logger l = allLoggers.nextElement();
+      Enumeration<Appender> allAppenders = l.getAllAppenders();
+      while (allAppenders.hasMoreElements()) {
+        Appender a = allAppenders.nextElement();
+        if (a instanceof TaskLogAppender) {
+          ((TaskLogAppender)a).flush();
+        }
+      }
+    }
+    if (currentTaskid != taskid) {
+      currentTaskid = taskid;
+      resetPrevLengths(firstTaskid);
+    }
+    writeToIndexFile(firstTaskid);
+  }
+  
   
   /**
    * The filter for userlogs.
@@ -133,9 +263,9 @@
     public Reader(TaskAttemptID taskid, LogName kind, 
                   long start, long end) throws IOException {
       // find the right log file
-      File filename = getTaskLogFile(taskid, kind);
+      LogFileDetail fileDetail = getTaskLogFileDetail(taskid, kind);
       // calculate the start and stop
-      long size = filename.length();
+      long size = fileDetail.length;
       if (start < 0) {
         start += size + 1;
       }
@@ -144,8 +274,11 @@
       }
       start = Math.max(0, Math.min(start, size));
       end = Math.max(0, Math.min(end, size));
+      start += fileDetail.start;
+      end += fileDetail.start;
       bytesRemaining = end - start;
-      file = new FileInputStream(filename);
+      file = new FileInputStream(new File(getBaseDir(fileDetail.location), 
+          kind.toString()));
       // skip upto start
       long pos = 0;
       while (pos < start) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLogAppender.java Fri Sep 19 00:31:41 2008
@@ -61,6 +61,10 @@
       }
     }
   }
+  
+  public void flush() {
+    qw.flush();
+  }
 
   @Override
   public synchronized void close() {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Fri Sep 19 00:31:41 2008
@@ -27,6 +27,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker;
@@ -235,30 +236,43 @@
    * @return the pid of the task process.
    */
   private String getPid(TaskAttemptID tipID) {
-    Path pidFileName = getPidFilePath(tipID);
+    Path pidFileName = getPidFilePath(tipID, taskTracker.getJobConf());
     if (pidFileName == null) {
       return null;
     }
     return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
   }
 
-  private LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+  private static LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator("mapred.local.dir");
 
   /**
    * Get the pidFile path of a Task
    * @param tipID
    * @return pidFile's Path
    */
-  Path getPidFilePath(TaskAttemptID tipID) {
+  public static Path getPidFilePath(TaskAttemptID tipID, JobConf conf) {
     Path pidFileName = null;
     try {
+      //this actually need not use a localdirAllocator since the PID
+      //files are really small..
       pidFileName = lDirAlloc.getLocalPathToRead(
           (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
-          taskTracker.getJobConf());
+          conf);
     } catch (IOException i) {
       // PID file is not there
       LOG.debug("Failed to get pidFile name for " + tipID);
     }
     return pidFileName;
   }
+  public void removePidFile(TaskAttemptID tid) {
+    if (taskTracker.isTaskMemoryManagerEnabled()) {
+      Path pidFilePath = getPidFilePath(tid, taskTracker.getJobConf());
+      if (pidFilePath != null) {
+        try {
+          FileSystem.getLocal(taskTracker.getJobConf()).delete(pidFilePath, false);
+        } catch(IOException ie) {}
+      }
+    }
+  }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Sep 19 00:31:41 2008
@@ -20,7 +20,6 @@
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.filecache.*;
 import org.apache.hadoop.util.*;
@@ -43,11 +42,16 @@
     LogFactory.getLog(TaskRunner.class);
 
   volatile boolean killed = false;
-  private ShellCommandExecutor shexec; // shell terminal for running the task
   private Task t;
+  private Object lock = new Object();
+  private volatile boolean done = false;
+  private int exitCode = -1;
+  private boolean exitCodeSet = false;
+  
   private TaskTracker tracker;
 
   protected JobConf conf;
+  JvmManager jvmManager;
 
   /** 
    * for cleaning up old map outputs
@@ -60,6 +64,7 @@
     this.conf = conf;
     this.mapOutputFile = new MapOutputFile(t.getJobID());
     this.mapOutputFile.setConf(conf);
+    this.jvmManager = tracker.getJvmManagerInstance();
   }
 
   public Task getTask() { return t; }
@@ -78,7 +83,7 @@
    */
   public void close() throws IOException {}
 
-  private String stringifyPathArray(Path[] p){
+  private static String stringifyPathArray(Path[] p){
     if (p == null){
       return null;
     }
@@ -143,7 +148,8 @@
                                                   true, Long.parseLong(
                                                         archivesTimestamps[i]),
                                                   new Path(workDir.
-                                                        getAbsolutePath()));
+                                                        getAbsolutePath()), 
+                                                  false);
             
           }
           DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
@@ -171,7 +177,8 @@
                                                   false, Long.parseLong(
                                                            fileTimestamps[i]),
                                                   new Path(workDir.
-                                                        getAbsolutePath()));
+                                                        getAbsolutePath()), 
+                                                  false);
           }
           DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
         }
@@ -185,17 +192,7 @@
           out.close();
         }
       }
-    
-      // create symlinks for all the files in job cache dir in current
-      // workingdir for streaming
-      try{
-        DistributedCache.createAllSymlink(conf, jobCacheDir, 
-                                          workDir);
-      } catch(IOException ie){
-        // Do not exit even if symlinks have not been created.
-        LOG.warn(StringUtils.stringifyException(ie));
-      }
-      
+          
       if (!prepare()) {
         return;
       }
@@ -366,7 +363,7 @@
       }
 
       // Add main class and its arguments 
-      vargs.add(TaskTracker.Child.class.getName());  // main of Child
+      vargs.add(Child.class.getName());  // main of Child
       // pass umbilical address
       InetSocketAddress address = tracker.getTaskTrackerReportAddress();
       vargs.add(address.getAddress().getHostAddress()); 
@@ -395,9 +392,7 @@
       File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
       stdout.getParentFile().mkdirs();
       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);
-      List<String> wrappedCommand = 
-        TaskLog.captureOutAndError(setup, vargs, stdout, stderr, logSize, pidFile);
-      LOG.debug("child jvm command : " + wrappedCommand.toString());
+
       Map<String, String> env = new HashMap<String, String>();
       StringBuffer ldLibraryPath = new StringBuffer();
       ldLibraryPath.append(workDir.toString());
@@ -408,9 +403,26 @@
         ldLibraryPath.append(oldLdLibraryPath);
       }
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
-      // Run the task as child of the parent TaskTracker process
-      runChild(wrappedCommand, workDir, env, taskid);
-
+      tracker.taskInitialized(t.getTaskID());
+      LOG.info("Task ID: " + t.getTaskID() +" initialized");
+      jvmManager.launchJvm(t.getJobID(), t.isMapTask(), 
+          jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
+              workDir, env, pidFile, conf));
+      synchronized (lock) {
+        while (!done) {
+          lock.wait();
+        }
+      }
+      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
+      if (exitCodeSet) {
+        if (!killed && exitCode != 0) {
+          if (exitCode == 65) {
+            tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
+          }
+          throw new IOException("Task process exit with nonzero status of " +
+              exitCode + ".");
+        }
+      }
     } catch (FSError e) {
       LOG.fatal("FSError", e);
       try {
@@ -445,31 +457,76 @@
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
       tracker.reportTaskFinished(t.getTaskID(), false);
+      if (t.isMapTask()) {
+        tracker.addFreeMapSlot();
+      } else {
+        tracker.addFreeReduceSlot();
+      }
     }
   }
-
-  /**
-   * Run the child process
-   */
-  private void runChild(List<String> args, File dir,
-                        Map<String, String> env,
-                        TaskAttemptID taskid) throws IOException {
-
-    shexec = new ShellCommandExecutor(args.toArray(new String[0]), dir, env);
-    try {
-      shexec.execute();
-    } catch (IOException ioe) {
-      // do nothing
-      // error and output are appropriately redirected
-    } finally { // handle the exit code
-      int exit_code = shexec.getExitCode();
-      tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
-      if (!killed && exit_code != 0) {
-        if (exit_code == 65) {
-          tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
+  
+  //Mostly for setting up the symlinks. Note that when we setup the distributed
+  //cache, we didn't create the symlinks. This is done on a per task basis
+  //by the currently executing task.
+  public static void setupWorkDir(JobConf conf) throws IOException {
+    File workDir = new File(".").getAbsoluteFile();
+    FileUtil.fullyDelete(workDir);
+    if (DistributedCache.getSymlink(conf)) {
+      URI[] archives = DistributedCache.getCacheArchives(conf);
+      URI[] files = DistributedCache.getCacheFiles(conf);
+      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
+      Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
+      if (archives != null) {
+        for (int i = 0; i < archives.length; i++) {
+          String link = archives[i].getFragment();
+          if (link != null) {
+            link = workDir.toString() + Path.SEPARATOR + link;
+            File flink = new File(link);
+            if (!flink.exists()) {
+              FileUtil.symLink(localArchives[i].toString(), link);
+            }
+          }
         }
-        throw new IOException("Task process exit with nonzero status of " +
-                              exit_code + ": "+ shexec);
+      }
+      if (files != null) {
+        for (int i = 0; i < files.length; i++) {
+          String link = files[i].getFragment();
+          if (link != null) {
+            link = workDir.toString() + Path.SEPARATOR + link;
+            File flink = new File(link);
+            if (!flink.exists()) {
+              FileUtil.symLink(localFiles[i].toString(), link);
+            }
+          }
+        }
+      }
+    }
+    File jobCacheDir = null;
+    if (conf.getJar() != null) {
+      jobCacheDir = new File(
+          new Path(conf.getJar()).getParent().toString());
+    }
+
+    // create symlinks for all the files in job cache dir in current
+    // workingdir for streaming
+    try{
+      DistributedCache.createAllSymlink(conf, jobCacheDir,
+          workDir);
+    } catch(IOException ie){
+      // Do not exit even if symlinks have not been created.
+      LOG.warn(StringUtils.stringifyException(ie));
+    }
+    // add java.io.tmpdir given by mapred.child.tmp
+    String tmp = conf.get("mapred.child.tmp", "./tmp");
+    Path tmpDir = new Path(tmp);
+
+    // if temp directory path is not absolute
+    // prepend it with workDir.
+    if (!tmpDir.isAbsolute()) {
+      tmpDir = new Path(workDir.toString(), tmp);
+      FileSystem localFs = FileSystem.getLocal(conf);
+      if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()){
+        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
       }
     }
   }
@@ -478,13 +535,17 @@
    * Kill the child process
    */
   public void kill() {
-    if (shexec != null) {
-      Process process = shexec.getProcess();
-      if (process != null) {
-        process.destroy();
-      }
-    }
     killed = true;
+    jvmManager.taskKilled(this);
+  }
+  public void signalDone() {
+    synchronized (lock) {
+      done = true;
+      lock.notify();
+    }
+  }
+  public void setExitCode(int exitCode) {
+    this.exitCodeSet = true;
+    this.exitCode = exitCode;
   }
-
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=696957&r1=696956&r2=696957&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Sep 19 00:31:41 2008
@@ -41,11 +41,11 @@
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING}
+                            COMMIT_PENDING, INITIALIZED}
     
   private TaskAttemptID taskid;
   private float progress;
-  private State runState;
+  private volatile State runState;
   private String diagnosticInfo;
   private String stateString;
   private String taskTracker;