You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/03/06 12:38:01 UTC

svn commit: r750855 - in /hadoop/core/branches/branch-0.19: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: yhemanth
Date: Fri Mar  6 11:37:55 2009
New Revision: 750855

URL: http://svn.apache.org/viewvc?rev=750855&view=rev
Log:
HADOOP-5376. Fixes the code handling lost tasktrackers to set the task state to KILLED_UNCLEAN only for relevant type of tasks. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=750855&r1=750854&r2=750855&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Fri Mar  6 11:37:55 2009
@@ -38,6 +38,10 @@
     HADOOP-5384. Fix a problem that DataNodeCluster creates blocks with
     generationStamp == 1.  (szetszwo)
 
+    HADOOP-5376. Fixes the code handling lost tasktrackers to set the task state
+    to KILLED_UNCLEAN only for relevant type of tasks.
+    (Amareshwari Sriramadasu via yhemanth)
+
 Release 0.19.1 - 2009-02-23
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=750855&r1=750854&r2=750855&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  6 11:37:55 2009
@@ -2821,13 +2821,16 @@
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING ||
               job.getStatus().getRunState() == JobStatus.PREP) {
+            // the state will be KILLED_UNCLEAN, if the task(map or reduce) 
+            // was RUNNING on the tracker
+            TaskStatus.State killState = (tip.isRunningTask(taskId) && 
+              !tip.isJobSetupTask() && !tip.isJobCleanupTask()) ? 
+              TaskStatus.State.KILLED_UNCLEAN : TaskStatus.State.KILLED;
             job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), 
                            (tip.isMapTask() ? 
                                TaskStatus.Phase.MAP : 
                                TaskStatus.Phase.REDUCE), 
-                            tip.isRunningTask(taskId) ? 
-                              TaskStatus.State.KILLED_UNCLEAN : 
-                              TaskStatus.State.KILLED,
+                            killState,
                             trackerName, myInstrumentation);
             jobsWithFailures.add(job);
           }

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=750855&r1=750854&r2=750855&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar  6 11:37:55 2009
@@ -570,7 +570,7 @@
       // Check if the user manually KILLED/FAILED this task-attempt...
       Boolean shouldFail = tasksToKill.remove(taskid);
       if (shouldFail != null) {
-        if (isCleanupAttempt(taskid)) {
+        if (isCleanupAttempt(taskid) || jobSetup || jobCleanup) {
           taskState = (shouldFail) ? TaskStatus.State.FAILED :
                                      TaskStatus.State.KILLED;
         } else {

Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=750855&r1=750854&r2=750855&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar  6 11:37:55 2009
@@ -586,6 +586,19 @@
   }
   
   /**
+   * Get the tasktrackerID in MiniMRCluster with given trackerName.
+   */
+  int getTaskTrackerID(String trackerName) {
+    for (int id=0; id < numTaskTrackers; id++) {
+      if (taskTrackerList.get(id).getTaskTracker().getName().equals(
+          trackerName)) {
+        return id;
+      }
+    }
+    return -1;
+  }
+  
+  /**
    * Shut down the servers.
    */
   public void shutdown() {

Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=750855&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (added)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Fri Mar  6 11:37:55 2009
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+/**
+ * Tests various failures in setup/cleanup of job, like 
+ * throwing exception, command line kill and lost tracker 
+ */
+public class TestSetupAndCleanupFailure extends TestCase {
+
+  final Path inDir = new Path("./input");
+  final Path outDir = new Path("./output");
+  static Path setupSignalFile = new Path("/setup-signal");
+  static Path cleanupSignalFile = new Path("/cleanup-signal");
+  String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+ 
+  // Commiter with setupJob throwing exception
+  static class CommitterWithFailSetup extends FileOutputCommitter {
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      throw new IOException();
+    }
+  }
+
+  // Commiter with cleanupJob throwing exception
+  static class CommitterWithFailCleanup extends FileOutputCommitter {
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      throw new IOException();
+    }
+  }
+
+  // Committer waits for a file to be created on dfs.
+  static class CommitterWithLongSetupAndCleanup extends FileOutputCommitter {
+    
+    private void waitForSignalFile(FileSystem fs, Path signalFile) 
+    throws IOException {
+      while (!fs.exists(signalFile)) {
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {
+         break;
+        }
+      }
+    }
+    
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      waitForSignalFile(FileSystem.get(context.getJobConf()), setupSignalFile);
+      super.setupJob(context);
+    }
+    
+    @Override
+    public void cleanupJob(JobContext context) throws IOException {
+      waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
+      super.cleanupJob(context);
+    }
+  }
+  
+  RunningJob launchJob(JobConf conf) 
+  throws IOException {
+    // set up the input file system and write input text.
+    FileSystem inFs = inDir.getFileSystem(conf);
+    FileSystem outFs = outDir.getFileSystem(conf);
+    outFs.delete(outDir, true);
+    if (!inFs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      // write input into input file
+      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+
+    // configure the mapred Job
+    conf.setMapperClass(IdentityMapper.class);        
+    conf.setReducerClass(IdentityReducer.class);
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+                                    "/tmp")).toString().replace(' ', '+');
+    conf.set("test.build.data", TEST_ROOT_DIR);
+
+    // return the RunningJob handle.
+    return new JobClient(conf).submitJob(conf);
+  }
+
+  // Among these tips only one of the tasks will be running,
+  // get the taskid for that task 
+  private TaskAttemptID getRunningTaskID(TaskInProgress[] tips) {
+    TaskAttemptID taskid = null;
+    while (taskid == null) {
+      for (TaskInProgress tip :tips) {
+        TaskStatus[] statuses = tip.getTaskStatuses();
+        for (TaskStatus status : statuses) {
+          if (status.getRunState() == TaskStatus.State.RUNNING) {
+            taskid = status.getTaskID();
+            break;
+          }
+        }
+        if (taskid != null) break;
+      }
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {}
+    }
+    return taskid;
+  }
+  
+  // Tests the failures in setup/cleanup job. Job should cleanly fail.
+  private void testFailCommitter(Class<? extends OutputCommitter> theClass,
+                                 JobConf jobConf) 
+  throws IOException {
+    jobConf.setOutputCommitter(theClass);
+    RunningJob job = launchJob(jobConf);
+    // wait for the job to finish.
+    job.waitForCompletion();
+    assertEquals(JobStatus.FAILED, job.getJobState());
+  }
+  
+  // launch job with CommitterWithLongSetupAndCleanup as committer
+  // and wait till the job is inited.
+  private RunningJob launchJobWithWaitingSetupAndCleanup(MiniMRCluster mr) 
+  throws IOException {
+    // launch job with waiting setup/cleanup
+    JobConf jobConf = mr.createJobConf();
+    jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class);
+    RunningJob job = launchJob(jobConf);
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jt.getJob(job.getID());
+    while (!jip.inited()) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {}
+    }
+    return job;
+  }
+  
+  /**
+   * Tests setup and cleanup attempts getting killed from command-line 
+   * and lost tracker
+   * 
+   * @param mr
+   * @param dfs
+   * @param commandLineKill if true, test with command-line kill
+   *                        else, test with lost tracker
+   * @throws IOException
+   */
+  private void testSetupAndCleanupKill(MiniMRCluster mr, 
+                                       MiniDFSCluster dfs, 
+                                       boolean commandLineKill) 
+  throws IOException {
+    // launch job with waiting setup/cleanup
+    RunningJob job = launchJobWithWaitingSetupAndCleanup(mr);
+    
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    JobInProgress jip = jt.getJob(job.getID());
+    // get the running setup task id
+    TaskAttemptID setupID = getRunningTaskID(jip.getSetupTasks());
+    if (commandLineKill) {
+      killTaskFromCommandLine(job, setupID, jt);
+    } else {
+      killTaskWithLostTracker(mr, setupID);
+    }
+    // signal the setup to complete
+    TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), 
+                            dfs.getFileSystem().getConf(), 
+                            setupSignalFile, (short)3);
+    // wait for maps and reduces to complete
+    while (job.reduceProgress() != 1.0f) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException ie) {}
+    }
+    // get the running cleanup task id
+    TaskAttemptID cleanupID = getRunningTaskID(jip.getCleanupTasks());
+    if (commandLineKill) {
+      killTaskFromCommandLine(job, cleanupID, jt);
+    } else {
+      killTaskWithLostTracker(mr, cleanupID);
+    }
+    // signal the cleanup to complete
+    TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), 
+                            dfs.getFileSystem().getConf(), 
+                            cleanupSignalFile, (short)3);
+    // wait for the job to finish.
+    job.waitForCompletion();
+    assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+    assertEquals(TaskStatus.State.KILLED, 
+                 jt.getTaskStatus(setupID).getRunState());
+    assertEquals(TaskStatus.State.KILLED, 
+                 jt.getTaskStatus(cleanupID).getRunState());
+  }
+  
+  // kill the task from command-line 
+  // wait till it kill is reported back
+  private void killTaskFromCommandLine(RunningJob job, 
+                                       TaskAttemptID taskid,
+                                       JobTracker jt) 
+  throws IOException {
+    job.killTask(taskid, false);
+    // wait till the kill happens
+    while (jt.getTaskStatus(taskid).getRunState() != 
+           TaskStatus.State.KILLED) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException ie) {}
+    }
+
+  }
+  // kill the task by losing the tracker
+  private void killTaskWithLostTracker(MiniMRCluster mr, 
+                                       TaskAttemptID taskid) {
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    String trackerName = jt.getTaskStatus(taskid).getTaskTracker();
+    int trackerID = mr.getTaskTrackerID(trackerName);
+    assertTrue(trackerID != -1);
+    mr.stopTaskTracker(trackerID);
+  }
+  
+  // Tests the failures in setup/cleanup job. Job should cleanly fail.
+  // Also Tests the command-line kill for setup/cleanup attempts. 
+  // tests the setup/cleanup attempts getting killed if 
+  // they were running on a lost tracker
+  public void testWithDFS() throws IOException {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 4;
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      fileSys = dfs.getFileSystem();
+      JobConf jtConf = new JobConf();
+      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+      jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
+      jtConf.setInt("mapred.reduce.copy.backoff", 4);
+      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
+                             null, null, jtConf);
+      // test setup/cleanup throwing exceptions
+      testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf());
+      testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf());
+      // test the command-line kill for setup/cleanup attempts. 
+      testSetupAndCleanupKill(mr, dfs, true);
+      // remove setup/cleanup signal files.
+      fileSys.delete(setupSignalFile , true);
+      fileSys.delete(cleanupSignalFile , true);
+      // test the setup/cleanup attempts getting killed if 
+      // they were running on a lost tracker
+      testSetupAndCleanupKill(mr, dfs, false);
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown();
+      }
+    }
+  }
+
+  public static void main(String[] argv) throws Exception {
+    TestSetupAndCleanupFailure td = new TestSetupAndCleanupFailure();
+    td.testWithDFS();
+  }
+}