You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/04/22 14:26:22 UTC

svn commit: r767484 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/util/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/pipes/ src/test/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/util/

Author: ddas
Date: Wed Apr 22 12:26:21 2009
New Revision: 767484

URL: http://svn.apache.org/viewvc?rev=767484&view=rev
Log:
HADOOP-5488. Removes the pidfile management for the Task JVM from the framework and instead passes the PID back and forth between the TaskTracker and the Task processes. Contributed by Ravi Gummadi.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmContext.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcessTree.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 22 12:26:21 2009
@@ -431,6 +431,10 @@
     HADOOP-5650. Fix safemode messages in the Namenode log.  (Suresh Srinivas
     via szetszwo)
 
+    HADOOP-5488. Removes the pidfile management for the Task JVM from the framework 
+    and instead passes the PID back and forth between the TaskTracker and the Task 
+    processes. (Ravi Gummadi via ddas)
+
 Release 0.20.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcessTree.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcessTree.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/util/ProcessTree.java Wed Apr 22 12:26:21 2009
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.util;
 
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -61,10 +58,21 @@
    * in the current thread
    * @param pid Process id(OR process group id) of to-be-deleted-process
    * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
+   *  sending SIGTERM
    */
-  private static void sigKillInCurrentThread(String pid, boolean isProcessGroup) {
-    // Kill the process tree with SIGKILL if it is still alive
-    if (ProcessTree.isAlive(pid)) {
+  private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
+      long sleepTimeBeforeSigKill) {
+    // Kill the subprocesses of root process(even if the root process is not
+    // alive) if process group is to be killed.
+    if (isProcessGroup || ProcessTree.isAlive(pid)) {
+      try {
+        // Sleep for some time before sending SIGKILL
+        Thread.sleep(sleepTimeBeforeSigKill);
+      } catch (InterruptedException i) {
+        LOG.warn("Thread sleep is interrupted.");
+      }
+
       ShellCommandExecutor shexec = null;
 
       try {
@@ -112,7 +120,7 @@
       sigKillThread.start();
     }
     else {
-      sigKillInCurrentThread(pid, isProcessGroup);
+      sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
     }
   }
 
@@ -186,50 +194,6 @@
     }
   }
 
-  /**
-   * Get PID from a pid-file.
-   *
-   * @param pidFileName
-   *          Name of the pid-file.
-   * @return the PID string read from the pid-file. Returns null if the
-   *         pidFileName points to a non-existing file or if read fails from
-   *         the file.
-   */
-  public static String getPidFromPidFile(String pidFileName) {
-    BufferedReader pidFile = null;
-    FileReader fReader = null;
-    String pid = null;
-
-    try {
-      fReader = new FileReader(pidFileName);
-      pidFile = new BufferedReader(fReader);
-    } catch (FileNotFoundException f) {
-      LOG.debug("PidFile doesn't exist : " + pidFileName);
-      return pid;
-    }
-
-    try {
-      pid = pidFile.readLine();
-    } catch (IOException i) {
-      LOG.error("Failed to read from " + pidFileName);
-    } finally {
-      try {
-        if (fReader != null) {
-          fReader.close();
-        }
-        try {
-          if (pidFile != null) {
-            pidFile.close();
-          }
-        } catch (IOException i) {
-          LOG.warn("Error closing the stream " + pidFile);
-        }
-      } catch (IOException i) {
-        LOG.warn("Error closing the stream " + fReader);
-      }
-    }
-    return pid;
-  }
 
   /**
    * Is the process with PID pid still alive?
@@ -256,9 +220,6 @@
    * Helper thread class that kills process-tree with SIGKILL in background
    */
   static class SigKillThread extends Thread {
-    private static final Log LOG = LogFactory
-               .getLog("SigKillThread.class");
-
     private String pid = null;
     private boolean isProcessGroup = false;
 
@@ -272,14 +233,7 @@
     }
 
     public void run() {
-      try {
-        // Sleep for some time before sending SIGKILL
-        Thread.sleep(sleepTimeBeforeSigKill);
-      } catch (InterruptedException i) {
-        LOG.warn("Thread sleep is interrupted.");
-      }
-
-      sigKillInCurrentThread(pid, isProcessGroup);
+      sigKillInCurrentThread(pid, isProcessGroup, sleepTimeBeforeSigKill);
     }
   }
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Child.java Wed Apr 22 12:26:21 2009
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.ByteArrayOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
@@ -28,16 +27,14 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JvmTask;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
-import org.apache.hadoop.util.Shell;
 import org.apache.log4j.LogManager;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Shell;
 
 /** 
  * The main() for child processes. 
@@ -46,7 +43,7 @@
 class Child {
 
   public static final Log LOG =
-    LogFactory.getLog(TaskTracker.class);
+    LogFactory.getLog(Child.class);
 
   static volatile TaskAttemptID taskid = null;
   static volatile boolean isCleanup;
@@ -100,17 +97,18 @@
     t.setName("Thread for syncLogs");
     t.setDaemon(true);
     t.start();
-    // A PID file is written once per JVM. We simply symlink the file
-    // on a per task basis later (see below). Long term, we should change
-    // the Memory manager to use JVMId instead of TaskAttemptId
-    Path srcPidPath = null;
-    Path dstPidPath = null;
+
+    String pid = "";
+    if (!Shell.WINDOWS) {
+      pid = System.getenv().get("JVM_PID");
+    }
+    JvmContext context = new JvmContext(jvmId, pid);
     int idleLoopCount = 0;
     Task task = null;
     try {
       while (true) {
         taskid = null;
-        JvmTask myTask = umbilical.getTask(jvmId);
+        JvmTask myTask = umbilical.getTask(context);
         if (myTask.shouldDie()) {
           break;
         } else {
@@ -137,17 +135,7 @@
         //are viewable immediately
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
-        if (srcPidPath == null && !Shell.WINDOWS) {
-          // get the first task's path for the first time
-          srcPidPath = new Path(task.getPidFile());
-        }
-        //since the JVM is running multiple tasks potentially, we need
-        //to do symlink stuff only for the subsequent tasks
-        if (!taskid.equals(firstTaskid) && !Shell.WINDOWS) {
-          dstPidPath = new Path(task.getPidFile());
-          FileUtil.symLink(srcPidPath.toUri().getPath(), 
-              dstPidPath.toUri().getPath());
-        }
+
         //setupWorkDir actually sets up the symlinks for the distributed
         //cache. After a task exits we wipe the workdir clean, and hence
         //the symlinks have to be rebuilt.
@@ -169,13 +157,6 @@
           task.run(job, umbilical);             // run the task
         } finally {
           TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
-          if (!taskid.equals(firstTaskid) && !Shell.WINDOWS) {
-            // delete the pid-file's symlink
-            boolean b = new File(dstPidPath.toUri().getPath()).delete();
-            if (!b) {
-              LOG.warn("File delete failed. Ignoring");
-            }
-          }
         }
         if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {
           break;

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Wed Apr 22 12:26:21 2009
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
 import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
@@ -52,7 +51,7 @@
     JvmEnv env = context.env;
     List<String> wrappedCommand = 
       TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, true, env.pidFile);
+          env.logSize, true);
     ShellCommandExecutor shexec = 
         new ShellCommandExecutor(wrappedCommand.toArray(new String[0]), 
                                   env.workDir, env.env);
@@ -69,28 +68,29 @@
    */
   void killTaskJVM(TaskController.TaskControllerContext context) {
     ShellCommandExecutor shexec = context.shExec;
-    JvmEnv env = context.env;
+
     if (shexec != null) {
       Process process = shexec.getProcess();
-      if (process != null) {
-        if (Shell.WINDOWS) {
+      if (Shell.WINDOWS) {
+        // Currently we don't use setsid on WINDOWS. So kill the process alone.
+        if (process != null) {
           process.destroy();
         }
-        else {
-          Path pidFilePath = new Path(env.pidFile);
-          String pid = ProcessTree.getPidFromPidFile(
-                                                pidFilePath.toString());
-          if (pid != null) {
-            long sleeptimeBeforeSigkill = env.conf.getLong(
-                 "mapred.tasktracker.tasks.sleeptime-before-sigkill",
-                 ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
-            ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
-                     ProcessTree.isSetsidAvailable, false);
-            try {
+      }
+      else { // In addition to the task JVM, kill its subprocesses also.
+        String pid = context.pid;
+        if (pid != null) {
+          ProcessTree.destroy(pid, context.sleeptimeBeforeSigkill,
+              ProcessTree.isSetsidAvailable, false);
+          try {
+            if (process != null) {
               LOG.info("Process exited with exit code:" + process.waitFor());
-            } catch (InterruptedException ie) {}
-          }
+            }
+          } catch (InterruptedException ie) {}
+        }
+        else if (process != null) {
+          // kill the task JVM alone, if we don't have the process group id
+          process.destroy();
         }
       }
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Wed Apr 22 12:26:21 2009
@@ -59,7 +59,7 @@
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
-    public JvmTask getTask(JVMId jvmId) throws IOException {
+    public JvmTask getTask(JvmContext context) throws IOException {
       return null;
     }
 

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmContext.java?rev=767484&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmContext.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmContext.java Wed Apr 22 12:26:21 2009
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+class JvmContext implements Writable {
+
+  public static final Log LOG =
+    LogFactory.getLog(JvmContext.class);
+  
+  JVMId jvmId;
+  String pid;
+  
+  JvmContext() {
+    jvmId = new JVMId();
+    pid = "";
+  }
+  
+  JvmContext(JVMId id, String pid) {
+    jvmId = id;
+    this.pid = pid;
+  }
+  
+  public void readFields(DataInput in) throws IOException {
+    jvmId.readFields(in);
+    this.pid = Text.readString(in);
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    jvmId.write(out);
+    Text.writeString(out, pid);
+  }
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JvmManager.java Wed Apr 22 12:26:21 2009
@@ -31,17 +31,15 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.ProcessTree;
 
 class JvmManager {
 
   public static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.mapred.JvmManager");
+    LogFactory.getLog(JvmManager.class);
 
   JvmManagerForType mapJvmManager;
 
@@ -49,8 +47,8 @@
   
   public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
       File stdout,File stderr,long logSize, File workDir, 
-      Map<String,String> env, String pidFile, JobConf conf) {
-    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,pidFile,conf);
+      Map<String,String> env, JobConf conf) {
+    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf);
   }
   
   public JvmManager(TaskTracker tracker) {
@@ -73,6 +71,38 @@
     }
   }
 
+  /*
+   * Saves pid of the given taskJvm
+   */
+  void setPidToJvm(JVMId jvmId, String pid) {
+    if (jvmId.isMapJVM()) {
+      mapJvmManager.jvmIdToPid.put(jvmId, pid);
+    }
+    else {
+      reduceJvmManager.jvmIdToPid.put(jvmId, pid);
+    }
+  }
+  
+  /*
+   * Returns the pid of the task
+   */
+  String getPid(TaskRunner t) {
+    if (t != null && t.getTask() != null) {
+      if (t.getTask().isMapTask()) {
+        JVMId id = mapJvmManager.runningTaskToJvm.get(t);
+        if (id != null) {
+          return mapJvmManager.jvmIdToPid.get(id);
+        }
+      } else {
+        JVMId id = reduceJvmManager.runningTaskToJvm.get(t);
+        if (id != null) {
+          return reduceJvmManager.jvmIdToPid.get(id);
+        }
+      }
+    }
+    return null;
+  }
+  
   public void launchJvm(TaskRunner t, JvmEnv env) {
     if (t.getTask().isMapTask()) {
       mapJvmManager.reapJvm(t, env);
@@ -122,6 +152,10 @@
     //Mapping from the JVM IDs to Reduce JVM processes
     Map <JVMId, JvmRunner> jvmIdToRunner = 
       new HashMap<JVMId, JvmRunner>();
+    //Mapping from the JVM IDs to process IDs
+    Map <JVMId, String> jvmIdToPid = 
+      new HashMap<JVMId, String>();
+    
     int maxJvms;
     boolean isMap;
     
@@ -211,6 +245,7 @@
     
     synchronized private void removeJvm(JVMId jvmId) {
       jvmIdToRunner.remove(jvmId);
+      jvmIdToPid.remove(jvmId);
     }
     private synchronized void reapJvm( 
         TaskRunner t, JvmEnv env) {
@@ -317,13 +352,13 @@
       jvmRunner.start();
     }
     synchronized private void updateOnJvmExit(JVMId jvmId, 
-        int exitCode, boolean killed) {
+        int exitCode) {
       removeJvm(jvmId);
       TaskRunner t = jvmToRunningTask.remove(jvmId);
 
       if (t != null) {
         runningTaskToJvm.remove(t);
-        if (!killed && exitCode != 0) {
+        if (exitCode != 0) {
           t.setExitCode(exitCode);
         }
         t.signalDone();
@@ -368,8 +403,11 @@
           if (shexec == null) {
             return;
           }
+          
+          kill();
+          
           int exitCode = shexec.getExitCode();
-          updateOnJvmExit(jvmId, exitCode, killed);
+          updateOnJvmExit(jvmId, exitCode);
           LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
               numTasksRan);
           try {
@@ -389,16 +427,24 @@
        * of processes) is created using setsid.
        */
       public void kill() {
-        TaskController controller = tracker.getTaskController();
-        //Check inital context before issuing a kill to prevent situations
-        //where kill is issued before task is launched.
-        if(initalContext != null && initalContext.env != null) {
-          controller.killTaskJVM(initalContext);
-        } else {
-          LOG.info(String.format("JVM Not killed %s but just removed", 
-              jvmId.toString()));
+        if (!killed) {
+          killed = true;
+          TaskController controller = tracker.getTaskController();
+          // Check inital context before issuing a kill to prevent situations
+          // where kill is issued before task is launched.
+          if (initalContext != null && initalContext.env != null) {
+            initalContext.pid = jvmIdToPid.get(jvmId);
+            initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
+                .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
+                    ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+
+            controller.killTaskJVM(initalContext);
+          } else {
+            LOG.info(String.format("JVM Not killed %s but just removed", jvmId
+                .toString()));
+          }
+          removeJvm(jvmId);
         }
-        removeJvm(jvmId);
       }
       
       public void taskRan() {
@@ -423,21 +469,19 @@
     File stdout;
     File stderr;
     File workDir;
-    String pidFile;
     long logSize;
     JobConf conf;
     Map<String, String> env;
 
     public JvmEnv(List<String> setup, Vector<String> vargs, File stdout, 
         File stderr, long logSize, File workDir, Map<String,String> env,
-        String pidFile, JobConf conf) {
+        JobConf conf) {
       this.setup = setup;
       this.vargs = vargs;
       this.stdout = stdout;
       this.stderr = stderr;
       this.workDir = workDir;
       this.env = env;
-      this.pidFile = pidFile;
       this.conf = conf;
     }
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LinuxTaskController.java Wed Apr 22 12:26:21 2009
@@ -124,7 +124,7 @@
     // get the JVM command line.
     String cmdLine = 
       TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, true, env.pidFile);
+          env.logSize, true);
 
     // write the command to a file in the
     // task specific cache directory
@@ -195,6 +195,8 @@
    * 
    * This method will launch the task controller setuid executable
    * that in turn will kill the task JVM by sending a kill signal.
+   * @param context the context storing the task running within the JVM
+   * that needs to be killed.
    */
   void killTaskJVM(TaskControllerContext context) {
    

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Wed Apr 22 12:26:21 2009
@@ -268,8 +268,8 @@
 
     // TaskUmbilicalProtocol methods
 
-    public JvmTask getTask(JVMId jvmId) { return null; }
-
+    public JvmTask getTask(JvmContext context) { return null; }
+    
     public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {
       LOG.info(taskStatus.getStateString());

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Wed Apr 22 12:26:21 2009
@@ -44,19 +44,17 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Shell;
 
 /** Base class for tasks. */
 abstract class Task implements Writable, Configurable {
   private static final Log LOG =
-    LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
+    LogFactory.getLog(Task.class);
 
   // Counters used by Task subclasses
   protected static enum Counter { 
@@ -138,7 +136,6 @@
   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
   protected final Counters.Counter spilledRecordsCounter;
-  private String pidFile = "";
 
   ////////////////////////////////////////////
   // Constructors
@@ -174,12 +171,6 @@
   public String getJobFile() { return jobFile; }
   public TaskAttemptID getTaskID() { return taskId; }
   Counters getCounters() { return counters; }
-  public void setPidFile(String pidFile) { 
-    this.pidFile = pidFile; 
-  }
-  public String getPidFile() { 
-    return pidFile; 
-  }
   
   /**
    * Get the job name for this task.
@@ -315,9 +306,6 @@
     out.writeBoolean(jobSetup);
     out.writeBoolean(writeSkipRecs);
     out.writeBoolean(taskCleanup);  
-    if(!Shell.WINDOWS) {
-      Text.writeString(out, pidFile);
-    }
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -337,9 +325,6 @@
     if (taskCleanup) {
       setPhase(TaskStatus.Phase.CLEANUP);
     }
-    if(!Shell.WINDOWS) {
-      pidFile = Text.readString(in);
-    }
   }
 
   @Override

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskController.java Wed Apr 22 12:26:21 2009
@@ -22,9 +22,7 @@
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
@@ -99,6 +97,10 @@
     JvmEnv env;
     // the Shell executor executing the JVM for this task
     ShellCommandExecutor shExec; 
+    // process handle of task JVM
+    String pid;
+    // waiting time before sending SIGKILL to task JVM after sending SIGTERM
+    long sleeptimeBeforeSigkill;
   }
 
   /**

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskLog.java Wed Apr 22 12:26:21 2009
@@ -52,7 +52,7 @@
  */
 public class TaskLog {
   private static final Log LOG =
-    LogFactory.getLog(TaskLog.class.getName());
+    LogFactory.getLog(TaskLog.class);
 
   private static final File LOG_DIR = 
     new File(System.getProperty("hadoop.log.dir"), 
@@ -408,7 +408,7 @@
                                                 long tailLength
                                                ) throws IOException {
     return captureOutAndError(null, cmd, stdoutFilename,
-                              stderrFilename, tailLength, false, null );
+                              stderrFilename, tailLength, false);
   }
 
   /**
@@ -430,7 +430,7 @@
                                                 long tailLength
                                                ) throws IOException {
     return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
-                              tailLength, false, null);
+                              tailLength, false);
   }
 
   /**
@@ -443,9 +443,13 @@
    * @param stdoutFilename The filename that stdout should be saved to
    * @param stderrFilename The filename that stderr should be saved to
    * @param tailLength The length of the tail to be saved.
-   * @param pidFileName The name of the pid-file
+   * @param pidFileName The name of the pid-file. pid-file's usage is deprecated
    * @return the modified command that should be run
+   * 
+   * @deprecated     pidFiles are no more used. Instead pid is exported to
+   *                 env variable JVM_PID.
    */
+  @Deprecated
   public static List<String> captureOutAndError(List<String> setup,
                                                 List<String> cmd, 
                                                 File stdoutFilename,
@@ -453,10 +457,10 @@
                                                 long tailLength,
                                                 String pidFileName
                                                ) throws IOException {
-    return captureOutAndError (setup, cmd, stdoutFilename, stderrFilename,
-                               tailLength, false, pidFileName);
+    return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
+        tailLength, false);
   }
-
+  
   /**
    * Wrap a command in a shell to capture stdout and stderr to files.
    * Setup commands such as setting memory limit can be passed which 
@@ -468,7 +472,6 @@
    * @param stderrFilename The filename that stderr should be saved to
    * @param tailLength The length of the tail to be saved.
    * @param useSetsid Should setsid be used in the command or not.
-   * @param pidFileName The name of the pid-file
    * @return the modified command that should be run
    */
   public static List<String> captureOutAndError(List<String> setup,
@@ -476,15 +479,14 @@
                                                 File stdoutFilename,
                                                 File stderrFilename,
                                                 long tailLength,
-                                                boolean useSetsid,
-                                                String pidFileName
+                                                boolean useSetsid
                                                ) throws IOException {
     List<String> result = new ArrayList<String>(3);
     result.add(bashCommand);
     result.add("-c");
     String mergedCmd = buildCommandLine(setup, cmd, stdoutFilename,
                                                     stderrFilename, tailLength, 
-                                                    useSetsid, pidFileName);
+                                                    useSetsid);
     result.add(mergedCmd);
     return result;
   }
@@ -496,7 +498,6 @@
    * @param stdoutFilename The filename that stdout should be saved to
    * @param stderrFilename The filename that stderr should be saved to
    * @param tailLength The length of the tail to be saved.
-   * @param pidFileName The name of the pid-file
    * @return the command line as a String
    * @throws IOException
    */
@@ -504,18 +505,17 @@
                                       File stdoutFilename,
                                       File stderrFilename,
                                       long tailLength, 
-                                      boolean useSetsid, String pidFileName)
+                                      boolean useSetsid)
                                 throws IOException {
     
     String stdout = FileUtil.makeShellPath(stdoutFilename);
     String stderr = FileUtil.makeShellPath(stderrFilename);    
     StringBuffer mergedCmd = new StringBuffer();
     
-    // Spit out the pid to pidFileName
-    if (pidFileName != null && !Shell.WINDOWS) {
-      mergedCmd.append("echo $$ > ");
-      mergedCmd.append(pidFileName);
-      mergedCmd.append(" ;");
+    // Export the pid of taskJvm to env variable JVM_PID.
+    // Currently pid is not used on Windows
+    if (!Shell.WINDOWS) {
+      mergedCmd.append(" export JVM_PID=`echo $$` ; ");
     }
 
     if (setup != null && setup.size() > 0) {

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Wed Apr 22 12:26:21 2009
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.File;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -67,11 +66,10 @@
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
   }
 
-  public void addTask(TaskAttemptID tid, long memLimit, String pidFile) {
+  public void addTask(TaskAttemptID tid, long memLimit) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
-      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, 
-                                                   memLimit, pidFile);
+      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit);
       tasksToBeAdded.put(tid, ptInfo);
     }
   }
@@ -87,15 +85,13 @@
     private String pid;
     private ProcfsBasedProcessTree pTree;
     private long memLimit;
-    private String pidFile;
 
     public ProcessTreeInfo(TaskAttemptID tid, String pid,
-        ProcfsBasedProcessTree pTree, long memLimit, String pidFile) {
+        ProcfsBasedProcessTree pTree, long memLimit) {
       this.tid = tid;
       this.pid = pid;
       this.pTree = pTree;
       this.memLimit = memLimit;
-      this.pidFile = pidFile;
     }
 
     public TaskAttemptID getTID() {
@@ -167,19 +163,16 @@
 
           // Initialize any uninitialized processTrees
           if (pId == null) {
-            // get pid from pid-file
-            pId = getPid(ptInfo.pidFile);
+            // get pid from taskAttemptId
+            pId = taskTracker.getPid(ptInfo.getTID());
             if (pId != null) {
-              // PID will be null, either if the pid file is yet to be created
-              // or if the tip is finished and we removed pidFile, but the TIP
-              // itself is still retained in runningTasks till successful
-              // transmission to JT
-
+              // pId will be null, either if the JVM is not spawned yet or if
+              // the JVM is removed from jvmIdToPid
               long sleeptimeBeforeSigkill =
                   taskTracker
                       .getJobConf()
                       .getLong(
-                          "mapred.tasktracker.sigkillthread.sleeptime-before-sigkill",
+                          "mapred.tasktracker.tasks.sleeptime-before-sigkill",
                           ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
 
               // create process tree object
@@ -320,18 +313,4 @@
     }
   }
 
-  /**
-   * Load pid of the task from the pidFile.
-   * 
-   * @param pidFileName
-   * @return the pid of the task process.
-   */
-  private String getPid(String pidFileName) {
-    if ((new File(pidFileName)).exists()) {
-      return ProcessTree.getPidFromPidFile(pidFileName);
-    }
-    return null;
-  }
-
-
 }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Wed Apr 22 12:26:21 2009
@@ -368,16 +368,7 @@
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid.toString());                      // pass task identifier
 
-      String pidFile = "";
-      if (!Shell.WINDOWS) {
-        pidFile = lDirAlloc.getLocalPathForWrite(
-            (TaskTracker.getPidFile(t.getJobID().toString(),
-               t.getTaskID().toString(), t.isTaskCleanupTask())),
-            this.conf).toString();
-        t.setPidFile(pidFile);
-      }
-      
-      tracker.addToMemoryManager(t.getTaskID(), conf, pidFile);
+      tracker.addToMemoryManager(t.getTaskID(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -410,7 +401,7 @@
       env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
       jvmManager.launchJvm(this, 
           jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, 
-              workDir, env, pidFile, conf));
+              workDir, env, conf));
       synchronized (lock) {
         while (!done) {
           lock.wait();

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed Apr 22 12:26:21 2009
@@ -81,7 +81,6 @@
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.util.ProcessTree;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
@@ -187,7 +186,6 @@
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
-  private static final String PID = "pid";
   private static final String OUTPUT = "output";
   private JobConf fConf;
   private int maxCurrentMapTasks;
@@ -448,39 +446,13 @@
 	}
 	return taskDir;
   }
-
-  static String getPidFile(String jobid, 
-                           String taskid, 
-                           boolean isCleanup) {
-    return  getLocalTaskDir(jobid, taskid, isCleanup)
-            + Path.SEPARATOR + PID;
-  }
-
-  /**
-   * Get the pidFile path of a Task
-   * 
-   * @param tid the TaskAttemptID of the task for which pidFile's path is needed
-   * @param conf Configuration for local dir allocator
-   * @param isCleanup true if the task is cleanup attempt
-   *  
-   * @return pidFile's Path
-   */
-  static Path getPidFilePath(TaskAttemptID tid, 
-                             JobConf conf, 
-                             boolean isCleanup) {
-    Path pidFileName = null;
-    try {
-      //this actually need not use a localdirAllocator since the PID
-      //files are really small..
-      pidFileName = lDirAlloc.getLocalPathToRead(
-        getPidFile(tid.getJobID().toString(), tid.toString(), isCleanup),
-        conf);
-    } catch (IOException i) {
-      // PID file is not there
-      LOG.warn("Failed to get pidFile name for " + tid + " " + 
-                StringUtils.stringifyException(i));
+  
+  String getPid(TaskAttemptID tid) {
+    TaskInProgress tip = tasks.get(tid);
+    if (tip != null) {
+      return jvmManager.getPid(tip.getTaskRunner());  
     }
-    return pidFileName;
+    return null;
   }
   
   public long getProtocolVersion(String protocol, 
@@ -1787,11 +1759,9 @@
   }
   
   void addToMemoryManager(TaskAttemptID attemptId, 
-                          JobConf conf, 
-                          String pidFile) {
+                          JobConf conf) {
     if (isTaskMemoryManagerEnabled()) {
-      taskMemoryManager.addTask(attemptId, 
-        getVirtualMemoryForTask(conf), pidFile);
+      taskMemoryManager.addTask(attemptId, getVirtualMemoryForTask(conf));
     }
   }
 
@@ -2574,8 +2544,13 @@
   /**
    * Called upon startup by the child process, to fetch Task data.
    */
-  public synchronized JvmTask getTask(JVMId jvmId) 
+  public synchronized JvmTask getTask(JvmContext context) 
   throws IOException {
+    JVMId jvmId = context.jvmId;
+    
+    // save pid of task JVM sent by child
+    jvmManager.setPidToJvm(jvmId, context.pid);
+    
     LOG.debug("JVM with ID : " + jvmId + " asked for a task");
     if (!jvmManager.isJvmKnown(jvmId)) {
       LOG.info("Killing unknown JVM " + jvmId);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Apr 22 12:26:21 2009
@@ -53,18 +53,20 @@
    * Version 13 changed the getTask method signature for HADOOP-249
    * Version 14 changed the getTask method signature for HADOOP-4232
    * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
+   * Version 16 Change in signature of getTask() for HADOOP-5488
    * */
 
-  public static final long versionID = 15L;
+  public static final long versionID = 16L;
   
   /**
    * Called when a child task process starts, to get its task.
-   * @param jvmId the ID of this JVM w.r.t the tasktracker that launched it
+   * @param context the JvmContext of the JVM w.r.t the TaskTracker that
+   *  launched it
    * @return Task object
    * @throws IOException 
    */
-  JvmTask getTask(JVMId jvmId) throws IOException;
-
+  JvmTask getTask(JvmContext context) throws IOException;
+  
   /**
    * Report child's progress to parent.
    * 

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/pipes/Application.java Wed Apr 22 12:26:21 2009
@@ -97,7 +97,7 @@
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);
     cmd = TaskLog.captureOutAndError(null, cmd, stdout, stderr, logLength,
-                                     false, null);
+                                     false);
 
     process = runClient(cmd, env);
     clientSocket = serverSocket.accept();

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobHistory.java Wed Apr 22 12:26:21 2009
@@ -465,7 +465,7 @@
     JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
 
     TestListener l = new TestListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
+    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
 
 
     // validate format of job level key, values
@@ -762,7 +762,7 @@
 
     DefaultJobHistoryParser.JobTasksParseListener l =
                    new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
+    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
 
     // Now the history file contents are available in jobInfo. Let us compare
     // them with the actual values from JT.
@@ -947,7 +947,7 @@
 
     DefaultJobHistoryParser.JobTasksParseListener l =
                   new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
-    JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
+    JobHistory.parseHistoryFromFS(logFile.toUri().getPath(), l, fileSys);
 
     assertTrue("Job Status read from job history file is not the expected" +
          " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS)));

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Wed Apr 22 12:26:21 2009
@@ -28,8 +28,6 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
@@ -52,25 +50,32 @@
   private static String TEST_ROOT_DIR = new File(System.getProperty(
       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
 
-  static JobClient jobClient = null;
+  private static JobClient jobClient = null;
 
-  static MiniMRCluster mr = null;
-  static Path scriptDir = new Path(TEST_ROOT_DIR + "/killjob");
+  private static MiniMRCluster mr = null;
+  private static Path scriptDir = null;
+  private static String scriptDirName = null;
+  private static String pid = null;
 
   // number of levels in the subtree of subprocesses of map task
-  static int numLevelsOfSubProcesses = 6;
+  private static int numLevelsOfSubProcesses = 4;
 
   /**
    * Runs a job, kills the job and verifies if the map task and its
    * subprocesses are also killed properly or not.
    */
-  static JobID runJobKill(JobTracker jt, JobConf conf) throws IOException {
+  private static void runKillingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
 
-    conf.setJobName("testkillsubprocesses");
-    conf.setMapperClass(KillMapperWithChild.class);
+    conf.setJobName("testkilljobsubprocesses");
+    conf.setMapperClass(KillingMapperWithChildren.class);
+    
+    scriptDir = new Path(TEST_ROOT_DIR + "/script");
+    RunningJob job = runJobAndSetProcessHandle(jt, conf);
 
-    RunningJob job = runJob(conf);
-    while (job.getJobState() != JobStatus.RUNNING) {
+    // kill the job now
+    job.killJob();
+
+    while (job.cleanupProgress() == 0.0f) {
       try {
         Thread.sleep(100);
       } catch (InterruptedException ie) {
@@ -78,11 +83,72 @@
         break;
       }
     }
-    String pid = null;
-    String scriptDirName = scriptDir.toString().substring(5);
 
+    validateKillingSubprocesses(job, conf);
+    // Checking the Job status
+    assertEquals(job.getJobState(), JobStatus.KILLED);
+  }
+
+  /**
+   * Runs a job that will fail and verifies if the subprocesses of failed map
+   * task are killed properly or not.
+   */
+  private static void runFailingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
+
+    conf.setJobName("testfailjobsubprocesses");
+    conf.setMapperClass(FailingMapperWithChildren.class);
+    
+    // We don't want to run the failing map task 4 times. So we run it once and
+    // check if all the subprocesses are killed properly.
+    conf.setMaxMapAttempts(1);
+    
+    scriptDir = new Path(TEST_ROOT_DIR + "/script");
+    RunningJob job = runJobAndSetProcessHandle(jt, conf);
+    signalTask(TEST_ROOT_DIR + "/failjob/signalFile", conf);
+    validateKillingSubprocesses(job, conf);
+    // Checking the Job status
+    assertEquals(job.getJobState(), JobStatus.FAILED);
+  }
+  
+  /**
+   * Runs a job that will succeed and verifies if the subprocesses of succeeded
+   * map task are killed properly or not.
+   */
+  private static void runSuccessfulJobAndValidate(JobTracker jt, JobConf conf)
+               throws IOException {
+
+    conf.setJobName("testsucceedjobsubprocesses");
+    conf.setMapperClass(MapperWithChildren.class);
+
+    scriptDir = new Path(TEST_ROOT_DIR + "/script");
+    RunningJob job = runJobAndSetProcessHandle(jt, conf);
+    signalTask(TEST_ROOT_DIR + "/succeedjob/signalFile", conf);
+    validateKillingSubprocesses(job, conf);
+    // Checking the Job status
+    assertEquals(job.getJobState(), JobStatus.SUCCEEDED);
+  }
+
+  /**
+   * Runs the given job and saves the pid of map task.
+   * Also checks if the subprocesses of map task are alive.
+   */
+  private static RunningJob runJobAndSetProcessHandle(JobTracker jt, JobConf conf)
+                     throws IOException {
+    RunningJob job = runJob(conf);
+    while (job.getJobState() != JobStatus.RUNNING) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        break;
+      }
+    }
+
+    pid = null;
+    scriptDirName = scriptDir.toUri().getPath();
+    jobClient = new JobClient(conf);
+    
     // get the taskAttemptID of the map task and use it to get the pid
-    // of map task from pid file
+    // of map task
     TaskReport[] mapReports = jobClient.getMapTaskReports(job.getID());
 
     JobInProgress jip = jt.getJob(job.getID());
@@ -104,45 +170,36 @@
         tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
         TaskAttemptID id = it.next();
         LOG.info("taskAttemptID of map task is " + id);
-
-        String localDir = mr.getTaskTrackerLocalDir(0); // TT with index 0
-        LOG.info("localDir is " + localDir);
-        JobConf confForThisTask = new JobConf(conf);
-        confForThisTask.set("mapred.local.dir", localDir);//set the localDir
-
-        Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false);
-        while (pidFilePath == null) {
-          //wait till the pid file is created
-          try {
-            Thread.sleep(500);
-          } catch (InterruptedException ie) {
-            LOG.warn("sleep is interrupted:" + ie);
-            break;
+        
+        while(pid == null) {
+          pid = mr.getTaskTrackerRunner(0).getTaskTracker().getPid(id);
+          if (pid == null) {
+            try {
+              Thread.sleep(500);
+            } catch(InterruptedException e) {}
           }
-          pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false);
         }
-
-        pid = ProcessTree.getPidFromPidFile(pidFilePath.toString());
         LOG.info("pid of map task is " + pid);
 
         // Checking if the map task is alive
         assertTrue(ProcessTree.isAlive(pid));
-        LOG.info("The map task is alive before killJob, as expected.");
+        LOG.info("The map task is alive before Job completion, as expected.");
       }
     }
 
     // Checking if the descendant processes of map task are alive
     if(ProcessTree.isSetsidAvailable) {
-      String childPid = ProcessTree.getPidFromPidFile(
+      String childPid = UtilsForTests.getPidFromPidFile(
                                scriptDirName + "/childPidFile" + 0);
       while(childPid == null) {
+        LOG.warn(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping...");
         try {
           Thread.sleep(500);
         } catch (InterruptedException ie) {
           LOG.warn("sleep is interrupted:" + ie);
           break;
         }
-        childPid = ProcessTree.getPidFromPidFile(
+        childPid = UtilsForTests.getPidFromPidFile(
                                scriptDirName + "/childPidFile" + 0);
       }
 
@@ -151,95 +208,74 @@
       // have been created already(See the script for details).
       // Now check if the descendants of map task are alive.
       for(int i=0; i <= numLevelsOfSubProcesses; i++) {
-        childPid = ProcessTree.getPidFromPidFile(
+        childPid = UtilsForTests.getPidFromPidFile(
                                scriptDirName + "/childPidFile" + i);
         LOG.info("pid of the descendant process at level " + i +
                  "in the subtree of processes(with the map task as the root)" +
                  " is " + childPid);
         assertTrue("Unexpected: The subprocess at level " + i +
-                   " in the subtree is not alive before killJob",
+                   " in the subtree is not alive before Job completion",
                    ProcessTree.isAlive(childPid));
       }
     }
-
-    // kill the job now
-    job.killJob();
-
-    while (job.cleanupProgress() == 0.0f) {
+    return job;
+  }
+  
+  /**
+   * Verifies if the subprocesses of the map task are killed properly.
+   */
+  private static void validateKillingSubprocesses(RunningJob job, JobConf conf)
+                   throws IOException {
+    // wait till the the job finishes
+    while (!job.isComplete()) {
       try {
-        Thread.sleep(100);
-      } catch (InterruptedException ie) {
-        LOG.warn("sleep is interrupted:" + ie);
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
         break;
       }
     }
 
-    // Checking that the Job got killed
-    assertTrue(job.isComplete());
-    assertEquals(job.getJobState(), JobStatus.KILLED);
-
     // Checking if the map task got killed or not
     assertTrue(!ProcessTree.isAlive(pid));
-    LOG.info("The map task is not alive after killJob, as expected.");
+    LOG.info("The map task is not alive after Job is completed, as expected.");
 
     // Checking if the descendant processes of map task are killed properly
     if(ProcessTree.isSetsidAvailable) {
       for(int i=0; i <= numLevelsOfSubProcesses; i++) {
-        String childPid = ProcessTree.getPidFromPidFile(
+        String childPid = UtilsForTests.getPidFromPidFile(
                                scriptDirName + "/childPidFile" + i);
         LOG.info("pid of the descendant process at level " + i +
                  "in the subtree of processes(with the map task as the root)" +
                  " is " + childPid);
         assertTrue("Unexpected: The subprocess at level " + i +
-                   " in the subtree is alive after killJob",
+                   " in the subtree is alive after Job completion",
                    !ProcessTree.isAlive(childPid));
       }
     }
-
-    return job.getID();
+    FileSystem fs = FileSystem.get(conf);
+    if(fs.exists(scriptDir)) {
+      fs.delete(scriptDir, true);
+    }
   }
-
-  static RunningJob runJob(JobConf conf) throws IOException {
+  
+  private static RunningJob runJob(JobConf conf) throws IOException {
 
     final Path inDir = new Path(TEST_ROOT_DIR + "/killjob/input");
     final Path outDir = new Path(TEST_ROOT_DIR + "/killjob/output");
 
     FileSystem fs = FileSystem.get(conf);
-    if(fs.exists(outDir)) {
-      fs.delete(outDir, true);
-    }
     if(fs.exists(scriptDir)) {
       fs.delete(scriptDir, true);
     }
-    if (!fs.exists(inDir)) {
-      fs.mkdirs(inDir);
-    }
-    // create input file
-    String input = "The quick brown fox\n" + "has many silly\n"
-        + "red fox sox\n";
-    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
-    file.writeBytes(input);
-    file.close();
-
-
-    conf.setInputFormat(TextInputFormat.class);
-    conf.setOutputKeyClass(Text.class);
-    conf.setOutputValueClass(IntWritable.class);
 
-    FileInputFormat.setInputPaths(conf, inDir);
-    FileOutputFormat.setOutputPath(conf, outDir);
     conf.setNumMapTasks(1);
     conf.setNumReduceTasks(0);
     conf.set("test.build.data", TEST_ROOT_DIR);
 
-    jobClient = new JobClient(conf);
-    RunningJob job = jobClient.submitJob(conf);
-
-    return job;
-
+    return UtilsForTests.runJob(conf, inDir, outDir);
   }
 
-  public void testJobKill() throws IOException {
+  public void testJobKillFailAndSucceed() throws IOException {
     if (Shell.WINDOWS) {
       System.out.println(
              "setsid doesn't work on WINDOWS as expected. Not testing");
@@ -253,54 +289,121 @@
       // run the TCs
       conf = mr.createJobConf();
       JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
-      runJobKill(jt, conf);
+      runKillingJobAndValidate(jt, conf);
+      runFailingJobAndValidate(jt, conf);
+      runSuccessfulJobAndValidate(jt, conf);
     } finally {
       if (mr != null) {
         mr.shutdown();
       }
+    }
+  }
+
+  /**
+   * Creates signal file
+   */
+  private static void signalTask(String signalFile, JobConf conf) {
+    try {
+      FileSystem fs = FileSystem.get(conf);
+      fs.createNewFile(new Path(signalFile));
+    } catch(IOException e) {
+      LOG.warn("Unable to create signal file. " + e);
+    }
+  }
+  
+  /**
+   * Runs a recursive shell script to create a chain of subprocesses
+   */
+  private static void runChildren(JobConf conf) throws IOException {
+    if (ProcessTree.isSetsidAvailable) {
       FileSystem fs = FileSystem.get(conf);
-      if(fs.exists(scriptDir)) {
-        fs.delete(scriptDir, true);
+      TEST_ROOT_DIR = new Path(conf.get("test.build.data")).toUri().getPath();
+      scriptDir = new Path(TEST_ROOT_DIR + "/script");  
+    
+      // create shell script
+      Random rm = new Random();
+      Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt()
+        + ".sh");
+      String shellScript = scriptPath.toString();
+      String script =
+        "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
+        "echo hello\n" +
+        "if [ $1 != 0 ]\nthen\n" +
+        " sh " + shellScript + " $(($1-1))\n" +
+        "else\n" +
+        " while true\n do\n" +
+        "  sleep 2\n" +
+        " done\n" +
+        "fi";
+      DataOutputStream file = fs.create(scriptPath);
+      file.writeBytes(script);
+      file.close();
+
+      LOG.info("Calling script from map task of failjob : " + shellScript);
+      Runtime.getRuntime()
+          .exec(shellScript + " " + numLevelsOfSubProcesses);
+    
+      String childPid = UtilsForTests.getPidFromPidFile(scriptDir
+          + "/childPidFile" + 0);
+      while (childPid == null) {
+        LOG.warn(scriptDir + "/childPidFile" + 0 + " is null; Sleeping...");
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException ie) {
+          LOG.warn("sleep is interrupted:" + ie);
+          break;
+        }
+        childPid = UtilsForTests.getPidFromPidFile(scriptDir
+            + "/childPidFile" + 0);
       }
     }
   }
-
-  static class KillMapperWithChild extends MapReduceBase implements
-      Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+  
+  /**
+   * Mapper that starts children
+   */
+  static class MapperWithChildren extends MapReduceBase implements
+  Mapper<WritableComparable, Writable, WritableComparable, Writable> {
+    FileSystem fs = null;
+    final Path signal = new Path(TEST_ROOT_DIR + "/script/signalFile");
     public void configure(JobConf conf) {
       try {
-        FileSystem fs = FileSystem.get(conf);
-        TEST_ROOT_DIR = conf.get("test.build.data").toString().substring(5);
-        scriptDir = new Path(TEST_ROOT_DIR + "/killjob");
-
-        if(ProcessTree.isSetsidAvailable) {
-          // create shell script
-          Random rm = new Random();
-          Path scriptPath = new Path(scriptDir, "_shellScript_" + rm.nextInt() + ".sh");
-          String shellScript = scriptPath.toString();
-          String script =
-             "echo $$ > " + scriptDir.toString() + "/childPidFile" + "$1\n" +
-             "echo hello\nsleep 1\n" +
-             "if [ $1 != 0 ]\nthen\n" +
-             " sh " + shellScript + " $(($1-1))\n" +
-             "else\n" +
-             " while true\n do\n" +
-             "  sleep 2\n" +
-             " done\n" +
-             "fi";
-          DataOutputStream file = fs.create(scriptPath);
-          file.writeBytes(script);
-          file.close();
-
-          LOG.info("Calling script from map task : " + shellScript);
-          Runtime.getRuntime().exec(shellScript + " " +
-                                    numLevelsOfSubProcesses);
-        }
+        runChildren(conf);
       } catch (Exception e) {
-        LOG.warn("Exception in KillMapperWithChild.configure: " +
+        LOG.warn("Exception in configure: " +
                  StringUtils.stringifyException(e));
       }
     }
+    
+    // Mapper waits for the signal(signal is the existence of a file)
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+      if (fs != null) {
+        while (!fs.exists(signal)) {// wait for signal file creation
+          try {
+            reporter.progress();
+            synchronized (this) {
+              this.wait(1000);
+            }
+          } catch (InterruptedException ie) {
+            System.out.println("Interrupted while the map was waiting for "
+                               + " the signal.");
+            break;
+          }
+        }
+      }
+    }
+  }
+  
+  /**
+   * Mapper that waits till it gets killed.
+   */
+  static class KillingMapperWithChildren extends MapperWithChildren {
+    public void configure(JobConf conf) {
+      super.configure(conf);
+    }
+    
     public void map(WritableComparable key, Writable value,
         OutputCollector<WritableComparable, Writable> out, Reporter reporter)
         throws IOException {
@@ -314,4 +417,33 @@
       }
     }
   }
+  
+  /**
+   * Mapper that fails when recieves a signal. Signal is existence of a file.
+   */
+  static class FailingMapperWithChildren extends MapperWithChildren {
+    public void configure(JobConf conf) {
+      super.configure(conf);
+    }
+
+    public void map(WritableComparable key, Writable value,
+        OutputCollector<WritableComparable, Writable> out, Reporter reporter)
+        throws IOException {
+      if (fs != null) {
+        while (!fs.exists(signal)) {// wait for signal file creation
+          try {
+            reporter.progress();
+            synchronized (this) {
+              this.wait(1000);
+            }
+          } catch (InterruptedException ie) {
+            System.out.println("Interrupted while the map was waiting for "
+                               + " the signal.");
+            break;
+          }
+        }
+      }
+      throw new RuntimeException("failing map");
+    }
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/UtilsForTests.java Wed Apr 22 12:26:21 2009
@@ -24,6 +24,9 @@
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.FileNotFoundException;
 import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.Enumeration;
@@ -547,7 +550,9 @@
                     throws IOException {
 
     FileSystem fs = FileSystem.get(conf);
-    fs.delete(outDir, true);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
     if (!fs.exists(inDir)) {
       fs.mkdirs(inDir);
     }
@@ -563,8 +568,8 @@
 
     FileInputFormat.setInputPaths(conf, inDir);
     FileOutputFormat.setOutputPath(conf, outDir);
-    conf.setNumMapTasks(1);
-    conf.setNumReduceTasks(1);
+    conf.setNumMapTasks(conf.getNumMapTasks());
+    conf.setNumReduceTasks(conf.getNumReduceTasks());
 
     JobClient jobClient = new JobClient(conf);
     RunningJob job = jobClient.submitJob(conf);
@@ -680,4 +685,49 @@
     config.writeXml(fos);
     fos.close();
   }
+
+  /**
+   * Get PID from a pid-file.
+   * 
+   * @param pidFileName
+   *          Name of the pid-file.
+   * @return the PID string read from the pid-file. Returns null if the
+   *         pidFileName points to a non-existing file or if read fails from the
+   *         file.
+   */
+  public static String getPidFromPidFile(String pidFileName) {
+    BufferedReader pidFile = null;
+    FileReader fReader = null;
+    String pid = null;
+
+    try {
+      fReader = new FileReader(pidFileName);
+      pidFile = new BufferedReader(fReader);
+    } catch (FileNotFoundException f) {
+      LOG.debug("PidFile doesn't exist : " + pidFileName);
+      return pid;
+    }
+
+    try {
+      pid = pidFile.readLine();
+    } catch (IOException i) {
+      LOG.error("Failed to read from " + pidFileName);
+    } finally {
+      try {
+        if (fReader != null) {
+          fReader.close();
+        }
+        try {
+          if (pidFile != null) {
+            pidFile.close();
+          }
+        } catch (IOException i) {
+          LOG.warn("Error closing the stream " + pidFile);
+        }
+      } catch (IOException i) {
+        LOG.warn("Error closing the stream " + fReader);
+      }
+    }
+    return pid;
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=767484&r1=767483&r2=767484&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Wed Apr 22 12:26:21 2009
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.mapred.UtilsForTests;
 
 import junit.framework.TestCase;
 
@@ -83,7 +84,7 @@
     }
 
     // read from pidFile
-    return ProcessTree.getPidFromPidFile(pidFile);
+    return UtilsForTests.getPidFromPidFile(pidFile);
   }
 
   public void testProcessTree() {