You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/10/14 07:03:30 UTC

svn commit: r704310 - in /hadoop/core/trunk: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/test/org/apache/hadoop/mapred/TestJobInProgress.java

Author: ddas
Date: Mon Oct 13 22:03:29 2008
New Revision: 704310

URL: http://svn.apache.org/viewvc?rev=704310&view=rev
Log:
HADOOP-4287. Fixes an issue to do with maintaining counts of running/pending maps/reduces. Contributed by Sreekanth Ramakrishnan.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=704310&r1=704309&r2=704310&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Oct 13 22:03:29 2008
@@ -895,6 +895,9 @@
     and org.apache.hadoop.security.AccessControlIOException into a single
     class hadoop.security.AccessControlException. (omalley via acmurthy)
 
+    HADOOP-4287. Fixes an issue to do with maintaining counts of running/pending
+    maps/reduces. (Sreekanth Ramakrishnan via ddas)
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=704310&r1=704309&r2=704310&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Oct 13 22:03:29 2008
@@ -492,11 +492,11 @@
     return finishedReduceTasks;
   }
   public synchronized int pendingMaps() {
-    return numMapTasks - runningMapTasks - failedMapTasks - 
+    return numMapTasks - runningMapTasks - failedMapTIPs - 
     finishedMapTasks + speculativeMapTasks;
   }
   public synchronized int pendingReduces() {
-    return numReduceTasks - runningReduceTasks - failedReduceTasks - 
+    return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
     finishedReduceTasks + speculativeReduceTasks;
   }
   public JobPriority getPriority() {
@@ -1915,8 +1915,6 @@
     if ((status.getRunState() == JobStatus.RUNNING) ||
          (status.getRunState() == JobStatus.PREP)) {
       LOG.info("Killing job '" + this.status.getJobID() + "'");
-      this.runningMapTasks = 0;
-      this.runningReduceTasks = 0;
       //
       // kill all TIPs.
       //

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java?rev=704310&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobInProgress.java Mon Oct 13 22:03:29 2008
@@ -0,0 +1,147 @@
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+import junit.framework.TestCase;
+
+public class TestJobInProgress extends TestCase {
+
+  private MiniMRCluster mrCluster;
+
+  private MiniDFSCluster dfsCluster;
+  JobTracker jt;
+
+  public static class FailMapTaskJob extends MapReduceBase implements
+      Mapper<LongWritable, Text, Text, IntWritable> {
+
+    @Override
+    public void map(LongWritable key, Text value,
+        OutputCollector<Text, IntWritable> output, Reporter reporter)
+        throws IOException {
+      // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        throw new IllegalArgumentException("Interrupted MAP task");
+      }
+      throw new IllegalArgumentException("Failing MAP task");
+    }
+  }
+
+  // Suppressing waring as we just need to write a failing reduce task job
+  // We don't need to bother about the actual key value pairs which are passed.
+  @SuppressWarnings("unchecked")
+  public static class FailReduceTaskJob extends MapReduceBase implements
+      Reducer {
+
+    @Override
+    public void reduce(Object key, Iterator values, OutputCollector output,
+        Reporter reporter) throws IOException {
+      // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        throw new IllegalArgumentException("Failing Reduce task");
+      }
+      throw new IllegalArgumentException("Failing Reduce task");
+    }
+
+  }
+
+  @Override
+  protected void setUp() throws Exception {
+    // TODO Auto-generated method stub
+    super.setUp();
+    final int taskTrackers = 4;
+    Configuration conf = new Configuration();
+    dfsCluster = new MiniDFSCluster(conf, 4, true, null);
+    mrCluster = new MiniMRCluster(taskTrackers, dfsCluster.getFileSystem()
+        .getUri().toString(), 1);
+    jt = mrCluster.getJobTrackerRunner().getJobTracker();
+  }
+
+  public void testPendingMapTaskCount() throws Exception {
+    launchTask(FailMapTaskJob.class, IdentityReducer.class);
+    checkTaskCounts();
+  }
+  
+  public void testPendingReduceTaskCount() throws Exception {
+    launchTask(IdentityMapper.class, FailReduceTaskJob.class);
+    checkTaskCounts();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    mrCluster.shutdown();
+    dfsCluster.shutdown();
+    super.tearDown();
+  }
+  
+
+  @SuppressWarnings("unchecked")
+  void launchTask(Class MapClass,Class ReduceClass) throws Exception{
+    JobConf jobConf = mrCluster.createJobConf();
+
+    JobClient jc = new JobClient(jobConf);
+    final Path inDir = new Path("./failjob/input");
+    final Path outDir = new Path("./failjob/output");
+    String input = "Test failing job.\n One more line";
+    FileSystem inFs = inDir.getFileSystem(jobConf);
+    FileSystem outFs = outDir.getFileSystem(jobConf);
+    outFs.delete(outDir, true);
+    if (!inFs.mkdirs(inDir)) {
+      throw new IOException("create directory failed" + inDir.toString());
+    }
+
+    DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+    file.writeBytes(input);
+    file.close();
+    jobConf.setJobName("failmaptask");
+    jobConf.setInputFormat(TextInputFormat.class);
+    jobConf.setOutputKeyClass(Text.class);
+    jobConf.setOutputValueClass(Text.class);
+    jobConf.setMapperClass(MapClass);
+    jobConf.setCombinerClass(ReduceClass);
+    jobConf.setReducerClass(ReduceClass);
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    FileOutputFormat.setOutputPath(jobConf, outDir);
+    jobConf.setNumMapTasks(10);
+    jobConf.setNumReduceTasks(5);
+    RunningJob job = null;
+    try {
+      job = JobClient.runJob(jobConf);
+    } catch (IOException e) {
+    }
+
+  }
+
+  void checkTaskCounts() {
+    JobStatus[] status = jt.getAllJobs();
+    for (JobStatus js : status) {
+      JobInProgress jip = jt.getJob(js.getJobID());
+      Counters counter = jip.getJobCounters();
+      long totalTaskCount = counter
+          .getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS)
+          + counter.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES);
+      while (jip.getNumTaskCompletionEvents() < totalTaskCount) {
+        assertEquals(true, (jip.runningMaps() >= 0));
+        assertEquals(true, (jip.pendingMaps() >= 0));
+        assertEquals(true, (jip.runningReduces() >= 0));
+        assertEquals(true, (jip.pendingReduces() >= 0));
+      }
+    }
+  }
+  
+}