You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/13 05:01:57 UTC

svn commit: r743974 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/mapred/org/apache/hadoop/mapred/TaskInProgress.java src/test/org/apache/hadoop/mapred/MiniMRCluster.java src/test/org/apache/hadoop/mapred/TestLostTracker.java

Author: ddas
Date: Fri Feb 13 04:01:57 2009
New Revision: 743974

URL: http://svn.apache.org/viewvc?rev=743974&view=rev
Log:
HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and killed tasks correctly. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java
Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=743974&r1=743973&r2=743974&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Fri Feb 13 04:01:57 2009
@@ -102,6 +102,9 @@
     HADOOP-5166. Fix JobTracker restart to work when ACLs are configured
     for the JobTracker. (Amar Kamat via yhemanth).
 
+    HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and
+    killed tasks correctly. (Amareshwari Sriramadasu via ddas)
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=743974&r1=743973&r2=743974&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Feb 13 04:01:57 2009
@@ -94,6 +94,8 @@
   // Map from task Id -> TaskTracker Id, contains tasks that are
   // currently runnings
   private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
+  // All attempt Ids of this TIP
+  private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
   private JobConf conf;
   private Map<TaskAttemptID,List<String>> taskDiagnosticData =
     new TreeMap<TaskAttemptID,List<String>>();
@@ -585,9 +587,7 @@
       }
     }
 
-    // Note that there can be failures of tasks that are hosted on a machine 
-    // that has not yet registered with restarted jobtracker
-    boolean isPresent = this.activeTasks.remove(taskid) != null;
+    this.activeTasks.remove(taskid);
     
     // Since we do not fail completed reduces (whose outputs go to hdfs), we 
     // should note this failure only for completed maps, only if this taskid;
@@ -601,8 +601,10 @@
       resetSuccessfulTaskid();
     }
 
+    // Note that there can be failures of tasks that are hosted on a machine 
+    // that has not yet registered with restarted jobtracker
     // recalculate the counts only if its a genuine failure
-    if (isPresent) {
+    if (tasks.contains(taskid)) {
       if (taskState == TaskStatus.State.FAILED) {
         numTaskFailures++;
         machinesWhereFailed.add(trackerHostName);
@@ -907,6 +909,7 @@
     }
 
     activeTasks.put(taskid, taskTracker);
+    tasks.add(taskid);
 
     // Ask JobTracker to note that the task exists
     jobtracker.createTaskEntry(taskid, taskTracker, this);

Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=743974&r1=743973&r2=743974&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Feb 13 04:01:57 2009
@@ -235,6 +235,9 @@
     return jobTracker;
   }
   
+  TaskTrackerRunner getTaskTrackerRunner(int id) {
+    return taskTrackerList.get(id);
+  }
   /**
    * Get the number of task trackers in the cluster
    */

Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=743974&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java (added)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java Fri Feb 13 04:01:57 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.TestJobTrackerRestart;
+
+import junit.framework.TestCase;
+import java.io.*;
+
+public class TestLostTracker extends TestCase {
+  final Path testDir = new Path("/jt-lost-tt");
+  final Path inDir = new Path(testDir, "input");
+  final Path shareDir = new Path(testDir, "share");
+  final Path outputDir = new Path(testDir, "output");
+  
+  private JobConf configureJob(JobConf conf, int[] maps, int[] reduces,
+                               String mapSignal, String redSignal) 
+  throws IOException {
+    JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL};
+    return TestJobTrackerRestart.getJobs(conf, priority, 
+                                         maps, reduces, outputDir, inDir, 
+                                         mapSignal, redSignal)[0];
+  }
+  
+  public void testLostTracker(MiniDFSCluster dfs,
+                              MiniMRCluster mr) 
+  throws IOException {
+    FileSystem fileSys = dfs.getFileSystem();
+    JobConf jobConf = mr.createJobConf();
+    int numMaps = 10;
+    int numReds = 1;
+    String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
+    String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
+    
+    // Configure the job
+    JobConf job = configureJob(jobConf, new int[] {numMaps}, 
+                               new int[] {numReds}, 
+                               mapSignalFile, redSignalFile);
+      
+    TestJobTrackerRestart.cleanUp(fileSys, shareDir);
+    
+    // Submit the job   
+    JobClient jobClient = new JobClient(job);
+    RunningJob rJob = jobClient.submitJob(job);
+    JobID id = rJob.getID();
+    
+    // wait for the job to be inited
+    mr.initializeJob(id);
+    
+    // Make sure that the master job is 50% completed
+    while (TestJobTrackerRestart.getJobStatus(jobClient, id).mapProgress() 
+           < 0.5f) {
+      TestJobTrackerRestart.waitFor(10);
+    }
+    // get a completed task on 1st tracker 
+    TaskAttemptID taskid = mr.getTaskTrackerRunner(0).getTaskTracker().
+                              getNonRunningTasks().get(0).getTaskID();
+
+    // Kill the 1st tasktracker
+    mr.stopTaskTracker(0);
+    
+    // Signal all the maps to complete
+    TestJobTrackerRestart.signalTasks(dfs, fileSys, true, 
+                                      mapSignalFile, redSignalFile);
+    
+    // Signal the reducers to complete
+    TestJobTrackerRestart.signalTasks(dfs, fileSys, false, 
+                                      mapSignalFile, redSignalFile);
+    // wait for the job to complete
+    TestJobTrackerRestart.waitTillDone(jobClient);
+
+    // Check if the tasks on the lost tracker got killed and re-executed
+    assertTrue(jobClient.getClusterStatus().getTaskTrackers() 
+                < mr.getNumTaskTrackers());
+    assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
+    TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+                         getTip(taskid.getTaskID());
+    assertTrue(tip.isComplete());
+    assertEquals(tip.numKilledTasks(), 1);
+  }
+  
+  public void testLostTracker() throws IOException {
+    String namenode = null;
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+
+    try {
+      Configuration conf = new Configuration();
+      conf.setBoolean("dfs.replication.considerLoad", false);
+      dfs = new MiniDFSCluster(conf, 1, true, null, null);
+      dfs.waitActive();
+      fileSys = dfs.getFileSystem();
+      
+      // clean up
+      fileSys.delete(testDir, true);
+      
+      if (!fileSys.mkdirs(inDir)) {
+        throw new IOException("Mkdirs failed to create " + inDir.toString());
+      }
+
+      // Write the input file
+      TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf, 
+                                           new Path(inDir + "/file"), 
+                                           (short)1);
+
+      dfs.startDataNodes(conf, 1, true, null, null, null, null);
+      dfs.waitActive();
+
+      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
+                 + (dfs.getFileSystem()).getUri().getPort();
+
+      JobConf jtConf = new JobConf();
+      jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+      jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+      jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
+      jtConf.setInt("mapred.reduce.copy.backoff", 4);
+      
+      mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
+      
+      // Test Lost tracker case
+      testLostTracker(dfs, mr);
+    } finally {
+      if (mr != null) {
+        try {
+          mr.shutdown();
+        } catch (Exception e) {}
+      }
+      if (dfs != null) {
+        try {
+          dfs.shutdown();
+        } catch (Exception e) {}
+      }
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    new TestLostTracker().testLostTracker();
+  }
+}