You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/13 05:01:57 UTC
svn commit: r743974 - in /hadoop/core/branches/branch-0.19: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
src/test/org/apache/hadoop/mapred/MiniMRCluster.java
src/test/org/apache/hadoop/mapred/TestLostTracker.java
Author: ddas
Date: Fri Feb 13 04:01:57 2009
New Revision: 743974
URL: http://svn.apache.org/viewvc?rev=743974&view=rev
Log:
HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and killed tasks correctly. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=743974&r1=743973&r2=743974&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Fri Feb 13 04:01:57 2009
@@ -102,6 +102,9 @@
HADOOP-5166. Fix JobTracker restart to work when ACLs are configured
for the JobTracker. (Amar Kamat via yhemanth).
+ HADOOP-5067. Fixes TaskInProgress.java to keep track of count of failed and
+ killed tasks correctly. (Amareshwari Sriramadasu via ddas)
+
Release 0.19.0 - 2008-11-18
INCOMPATIBLE CHANGES
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=743974&r1=743973&r2=743974&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Feb 13 04:01:57 2009
@@ -94,6 +94,8 @@
// Map from task Id -> TaskTracker Id, contains tasks that are
// currently runnings
private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
+ // All attempt Ids of this TIP
+ private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
private JobConf conf;
private Map<TaskAttemptID,List<String>> taskDiagnosticData =
new TreeMap<TaskAttemptID,List<String>>();
@@ -585,9 +587,7 @@
}
}
- // Note that there can be failures of tasks that are hosted on a machine
- // that has not yet registered with restarted jobtracker
- boolean isPresent = this.activeTasks.remove(taskid) != null;
+ this.activeTasks.remove(taskid);
// Since we do not fail completed reduces (whose outputs go to hdfs), we
// should note this failure only for completed maps, only if this taskid;
@@ -601,8 +601,10 @@
resetSuccessfulTaskid();
}
+ // Note that there can be failures of tasks that are hosted on a machine
+ // that has not yet registered with restarted jobtracker
// recalculate the counts only if its a genuine failure
- if (isPresent) {
+ if (tasks.contains(taskid)) {
if (taskState == TaskStatus.State.FAILED) {
numTaskFailures++;
machinesWhereFailed.add(trackerHostName);
@@ -907,6 +909,7 @@
}
activeTasks.put(taskid, taskTracker);
+ tasks.add(taskid);
// Ask JobTracker to note that the task exists
jobtracker.createTaskEntry(taskid, taskTracker, this);
Modified: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=743974&r1=743973&r2=743974&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Feb 13 04:01:57 2009
@@ -235,6 +235,9 @@
return jobTracker;
}
+ TaskTrackerRunner getTaskTrackerRunner(int id) {
+ return taskTrackerList.get(id);
+ }
/**
* Get the number of task trackers in the cluster
*/
Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=743974&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java (added)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestLostTracker.java Fri Feb 13 04:01:57 2009
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.TestJobTrackerRestart;
+
+import junit.framework.TestCase;
+import java.io.*;
+
+public class TestLostTracker extends TestCase {
+ final Path testDir = new Path("/jt-lost-tt");
+ final Path inDir = new Path(testDir, "input");
+ final Path shareDir = new Path(testDir, "share");
+ final Path outputDir = new Path(testDir, "output");
+
+ private JobConf configureJob(JobConf conf, int[] maps, int[] reduces,
+ String mapSignal, String redSignal)
+ throws IOException {
+ JobPriority[] priority = new JobPriority[] {JobPriority.NORMAL};
+ return TestJobTrackerRestart.getJobs(conf, priority,
+ maps, reduces, outputDir, inDir,
+ mapSignal, redSignal)[0];
+ }
+
+ public void testLostTracker(MiniDFSCluster dfs,
+ MiniMRCluster mr)
+ throws IOException {
+ FileSystem fileSys = dfs.getFileSystem();
+ JobConf jobConf = mr.createJobConf();
+ int numMaps = 10;
+ int numReds = 1;
+ String mapSignalFile = TestJobTrackerRestart.getMapSignalFile(shareDir);
+ String redSignalFile = TestJobTrackerRestart.getReduceSignalFile(shareDir);
+
+ // Configure the job
+ JobConf job = configureJob(jobConf, new int[] {numMaps},
+ new int[] {numReds},
+ mapSignalFile, redSignalFile);
+
+ TestJobTrackerRestart.cleanUp(fileSys, shareDir);
+
+ // Submit the job
+ JobClient jobClient = new JobClient(job);
+ RunningJob rJob = jobClient.submitJob(job);
+ JobID id = rJob.getID();
+
+ // wait for the job to be inited
+ mr.initializeJob(id);
+
+ // Make sure that the master job is 50% completed
+ while (TestJobTrackerRestart.getJobStatus(jobClient, id).mapProgress()
+ < 0.5f) {
+ TestJobTrackerRestart.waitFor(10);
+ }
+ // get a completed task on 1st tracker
+ TaskAttemptID taskid = mr.getTaskTrackerRunner(0).getTaskTracker().
+ getNonRunningTasks().get(0).getTaskID();
+
+ // Kill the 1st tasktracker
+ mr.stopTaskTracker(0);
+
+ // Signal all the maps to complete
+ TestJobTrackerRestart.signalTasks(dfs, fileSys, true,
+ mapSignalFile, redSignalFile);
+
+ // Signal the reducers to complete
+ TestJobTrackerRestart.signalTasks(dfs, fileSys, false,
+ mapSignalFile, redSignalFile);
+ // wait for the job to complete
+ TestJobTrackerRestart.waitTillDone(jobClient);
+
+ // Check if the tasks on the lost tracker got killed and re-executed
+ assertTrue(jobClient.getClusterStatus().getTaskTrackers()
+ < mr.getNumTaskTrackers());
+ assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
+ TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+ getTip(taskid.getTaskID());
+ assertTrue(tip.isComplete());
+ assertEquals(tip.numKilledTasks(), 1);
+ }
+
+ public void testLostTracker() throws IOException {
+ String namenode = null;
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+
+ try {
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.replication.considerLoad", false);
+ dfs = new MiniDFSCluster(conf, 1, true, null, null);
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+
+ // clean up
+ fileSys.delete(testDir, true);
+
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+
+ // Write the input file
+ TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf,
+ new Path(inDir + "/file"),
+ (short)1);
+
+ dfs.startDataNodes(conf, 1, true, null, null, null, null);
+ dfs.waitActive();
+
+ namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
+ + (dfs.getFileSystem()).getUri().getPort();
+
+ JobConf jtConf = new JobConf();
+ jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+ jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+ jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
+ jtConf.setInt("mapred.reduce.copy.backoff", 4);
+
+ mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
+
+ // Test Lost tracker case
+ testLostTracker(dfs, mr);
+ } finally {
+ if (mr != null) {
+ try {
+ mr.shutdown();
+ } catch (Exception e) {}
+ }
+ if (dfs != null) {
+ try {
+ dfs.shutdown();
+ } catch (Exception e) {}
+ }
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ new TestLostTracker().testLostTracker();
+ }
+}