You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/10/14 07:03:30 UTC
svn commit: r704310 - in /hadoop/core/trunk: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JobInProgress.java
src/test/org/apache/hadoop/mapred/TestJobInProgress.java
Author: ddas
Date: Mon Oct 13 22:03:29 2008
New Revision: 704310
URL: http://svn.apache.org/viewvc?rev=704310&view=rev
Log:
HADOOP-4287. Fixes an issue to do with maintaining counts of running/pending maps/reduces. Contributed by Sreekanth Ramakrishnan.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=704310&r1=704309&r2=704310&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Oct 13 22:03:29 2008
@@ -895,6 +895,9 @@
and org.apache.hadoop.security.AccessControlIOException into a single
class hadoop.security.AccessControlException. (omalley via acmurthy)
+ HADOOP-4287. Fixes an issue to do with maintaining counts of running/pending
+ maps/reduces. (Sreekanth Ramakrishnan via ddas)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=704310&r1=704309&r2=704310&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Oct 13 22:03:29 2008
@@ -492,11 +492,11 @@
return finishedReduceTasks;
}
public synchronized int pendingMaps() {
- return numMapTasks - runningMapTasks - failedMapTasks -
+ return numMapTasks - runningMapTasks - failedMapTIPs -
finishedMapTasks + speculativeMapTasks;
}
public synchronized int pendingReduces() {
- return numReduceTasks - runningReduceTasks - failedReduceTasks -
+ return numReduceTasks - runningReduceTasks - failedReduceTIPs -
finishedReduceTasks + speculativeReduceTasks;
}
public JobPriority getPriority() {
@@ -1915,8 +1915,6 @@
if ((status.getRunState() == JobStatus.RUNNING) ||
(status.getRunState() == JobStatus.PREP)) {
LOG.info("Killing job '" + this.status.getJobID() + "'");
- this.runningMapTasks = 0;
- this.runningReduceTasks = 0;
//
// kill all TIPs.
//
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java?rev=704310&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Mon Oct 13 22:03:29 2008
@@ -0,0 +1,147 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+import junit.framework.TestCase;
+
+public class TestJobInProgress extends TestCase {
+
+ private MiniMRCluster mrCluster;
+
+ private MiniDFSCluster dfsCluster;
+ JobTracker jt;
+
+ public static class FailMapTaskJob extends MapReduceBase implements
+ Mapper<LongWritable, Text, Text, IntWritable> {
+
+ @Override
+ public void map(LongWritable key, Text value,
+ OutputCollector<Text, IntWritable> output, Reporter reporter)
+ throws IOException {
+ // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new IllegalArgumentException("Interrupted MAP task");
+ }
+ throw new IllegalArgumentException("Failing MAP task");
+ }
+ }
+
+ // Suppressing waring as we just need to write a failing reduce task job
+ // We don't need to bother about the actual key value pairs which are passed.
+ @SuppressWarnings("unchecked")
+ public static class FailReduceTaskJob extends MapReduceBase implements
+ Reducer {
+
+ @Override
+ public void reduce(Object key, Iterator values, OutputCollector output,
+ Reporter reporter) throws IOException {
+ // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ throw new IllegalArgumentException("Failing Reduce task");
+ }
+ throw new IllegalArgumentException("Failing Reduce task");
+ }
+
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ // TODO Auto-generated method stub
+ super.setUp();
+ final int taskTrackers = 4;
+ Configuration conf = new Configuration();
+ dfsCluster = new MiniDFSCluster(conf, 4, true, null);
+ mrCluster = new MiniMRCluster(taskTrackers, dfsCluster.getFileSystem()
+ .getUri().toString(), 1);
+ jt = mrCluster.getJobTrackerRunner().getJobTracker();
+ }
+
+ public void testPendingMapTaskCount() throws Exception {
+ launchTask(FailMapTaskJob.class, IdentityReducer.class);
+ checkTaskCounts();
+ }
+
+ public void testPendingReduceTaskCount() throws Exception {
+ launchTask(IdentityMapper.class, FailReduceTaskJob.class);
+ checkTaskCounts();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ mrCluster.shutdown();
+ dfsCluster.shutdown();
+ super.tearDown();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ void launchTask(Class MapClass,Class ReduceClass) throws Exception{
+ JobConf jobConf = mrCluster.createJobConf();
+
+ JobClient jc = new JobClient(jobConf);
+ final Path inDir = new Path("./failjob/input");
+ final Path outDir = new Path("./failjob/output");
+ String input = "Test failing job.\n One more line";
+ FileSystem inFs = inDir.getFileSystem(jobConf);
+ FileSystem outFs = outDir.getFileSystem(jobConf);
+ outFs.delete(outDir, true);
+ if (!inFs.mkdirs(inDir)) {
+ throw new IOException("create directory failed" + inDir.toString());
+ }
+
+ DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+ jobConf.setJobName("failmaptask");
+ jobConf.setInputFormat(TextInputFormat.class);
+ jobConf.setOutputKeyClass(Text.class);
+ jobConf.setOutputValueClass(Text.class);
+ jobConf.setMapperClass(MapClass);
+ jobConf.setCombinerClass(ReduceClass);
+ jobConf.setReducerClass(ReduceClass);
+ FileInputFormat.setInputPaths(jobConf, inDir);
+ FileOutputFormat.setOutputPath(jobConf, outDir);
+ jobConf.setNumMapTasks(10);
+ jobConf.setNumReduceTasks(5);
+ RunningJob job = null;
+ try {
+ job = JobClient.runJob(jobConf);
+ } catch (IOException e) {
+ }
+
+ }
+
+ void checkTaskCounts() {
+ JobStatus[] status = jt.getAllJobs();
+ for (JobStatus js : status) {
+ JobInProgress jip = jt.getJob(js.getJobID());
+ Counters counter = jip.getJobCounters();
+ long totalTaskCount = counter
+ .getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS)
+ + counter.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES);
+ while (jip.getNumTaskCompletionEvents() < totalTaskCount) {
+ assertEquals(true, (jip.runningMaps() >= 0));
+ assertEquals(true, (jip.pendingMaps() >= 0));
+ assertEquals(true, (jip.runningReduces() >= 0));
+ assertEquals(true, (jip.pendingReduces() >= 0));
+ }
+ }
+ }
+
+}