You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by am...@apache.org on 2011/05/23 19:06:09 UTC

svn commit: r1126591 - in /hadoop/mapreduce/trunk: ./ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/ src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduc...

Author: amarrk
Date: Mon May 23 17:06:08 2011
New Revision: 1126591

URL: http://svn.apache.org/viewvc?rev=1126591&view=rev
Log:
MAPREDUCE-2492. The new MapReduce API should make available task's progress to the task. (amarrk)

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java
    hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon May 23 17:06:08 2011
@@ -14,6 +14,9 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
 
+    MAPREDUCE-2492. The new MapReduce API should make available task's
+    progress to the task. (amarrk)
+
     MAPREDUCE-2153. Bring in more job configuration properties in to the trace 
     file. (Rajesh Balamohan via amarrk)
 

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mapreduce/mock/MockReporter.java Mon May 23 17:06:08 2011
@@ -58,5 +58,10 @@ public class MockReporter extends Status
 
     return counter;
   }
+  
+  @Override
+  public float getProgress() {
+    return 0;
+  }
 }
 

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/src/java/org/apache/hadoop/mrunit/mock/MockReporter.java Mon May 23 17:06:08 2011
@@ -92,5 +92,9 @@ public class MockReporter implements Rep
 
     return counter;
   }
+  
+  public float getProgress() {
+    return 0;
+  };
 }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Mon May 23 17:06:08 2011
@@ -59,7 +59,6 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.task.MapContextImpl;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
@@ -294,8 +293,16 @@ class MapTask extends Task {
     this.umbilical = umbilical;
 
     if (isMapTask()) {
-      mapPhase = getProgress().addPhase("map", 0.667f);
-      sortPhase  = getProgress().addPhase("sort", 0.333f);
+      // If there are no reducers then there won't be any sort. Hence the map 
+      // phase will govern the entire attempt's progress.
+      if (conf.getNumReduceTasks() == 0) {
+        mapPhase = getProgress().addPhase("map", 1.0f);
+      } else {
+        // If there are reducers then the entire attempt's progress will be 
+        // split between the map phase (67%) and the sort phase (33%).
+        mapPhase = getProgress().addPhase("map", 0.667f);
+        sortPhase  = getProgress().addPhase("sort", 0.333f);
+      }
     }
     TaskReporter reporter = startReporter(umbilical);
  
@@ -388,7 +395,10 @@ class MapTask extends Task {
     try {
       runner.run(in, new OldOutputCollector(collector, conf), reporter);
       mapPhase.complete();
-      setPhase(TaskStatus.Phase.SORT);
+      // start the sort phase only if there are reducers
+      if (numReduceTasks > 0) {
+        setPhase(TaskStatus.Phase.SORT);
+      }
       statusUpdate(umbilical);
       collector.flush();
     } finally {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Mon May 23 17:06:08 2011
@@ -361,6 +361,8 @@ public class ReduceTask extends Task {
                     taskStatus, copyPhase, sortPhase, this);
       rIter = shuffle.run();
     } else {
+      // local job runner doesn't have a copy phase
+      copyPhase.complete();
       final FileSystem rfs = FileSystem.getLocal(job).getRaw();
       rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(),
                            job.getMapOutputValueClass(), codec, 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Reporter.java Mon May 23 17:06:08 2011
@@ -64,6 +64,10 @@ public interface Reporter extends Progre
       public InputSplit getInputSplit() throws UnsupportedOperationException {
         throw new UnsupportedOperationException("NULL reporter has no input");
       }
+      @Override
+      public float getProgress() {
+        return 0;
+      }
     };
 
   /**
@@ -120,4 +124,10 @@ public interface Reporter extends Progre
    */
   public abstract InputSplit getInputSplit() 
     throws UnsupportedOperationException;
+  
+  /**
+   * Get the progress of the task. Progress is represented as a number between
+   * 0 and 1 (inclusive).
+   */
+  public float getProgress();
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Task.java Mon May 23 17:06:08 2011
@@ -569,6 +569,11 @@ abstract public class Task implements Wr
       // indicate that progress update needs to be sent
       setProgressFlag();
     }
+    
+    public float getProgress() {
+      return taskProgress.getProgress();
+    };
+    
     public void progress() {
       // indicate that progress update needs to be sent
       setProgressFlag();

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskAttemptContextImpl.java Mon May 23 17:06:08 2011
@@ -60,6 +60,11 @@ public class TaskAttemptContextImpl
   public JobConf getJobConf() {
     return (JobConf) getConfiguration();
   }
+  
+  @Override
+  public float getProgress() {
+    return reporter.getProgress();
+  }
 
   @Override
   public Counter getCounter(Enum<?> counterName) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/StatusReporter.java Mon May 23 17:06:08 2011
@@ -24,5 +24,11 @@ public abstract class StatusReporter {
   public abstract Counter getCounter(Enum<?> name);
   public abstract Counter getCounter(String group, String name);
   public abstract void progress();
+  /**
+   * Get the current progress.
+   * @return a number between 0.0 and 1.0 (inclusive) indicating the attempt's 
+   * progress.
+   */
+  public abstract float getProgress();
   public abstract void setStatus(String status);
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java Mon May 23 17:06:08 2011
@@ -44,6 +44,13 @@ public interface TaskAttemptContext exte
    * @return the current status message
    */
   public String getStatus();
+  
+  /**
+   * The current progress of the task attempt.
+   * @return a number between 0.0 and 1.0 (inclusive) indicating the attempt's
+   * progress.
+   */
+  public abstract float getProgress();
 
   /**
    * Get the {@link Counter} for the given <code>counterName</code>.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainMapContextImpl.java Mon May 23 17:06:08 2011
@@ -316,4 +316,8 @@ class ChainMapContextImpl<KEYIN, VALUEIN
     return base.getCredentials();
   }
 
+  @Override
+  public float getProgress() {
+    return base.getProgress();
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/chain/ChainReduceContextImpl.java Mon May 23 17:06:08 2011
@@ -308,4 +308,9 @@ class ChainReduceContextImpl<KEYIN, VALU
   public Credentials getCredentials() {
     return base.getCredentials();
   }
+  
+  @Override
+  public float getProgress() {
+    return base.getProgress();
+  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/join/Parser.java Mon May 23 17:06:08 2011
@@ -386,6 +386,11 @@ public abstract static class Node extend
     }
 
     @Override
+    public float getProgress() {
+      return context.getProgress();
+    }
+    
+    @Override
     public void setStatus(String status) {
       context.setStatus(status);
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/MultithreadedMapper.java Mon May 23 17:06:08 2011
@@ -240,6 +240,10 @@ public class MultithreadedMapper<K1, V1,
       outer.setStatus(status);
     }
     
+    @Override
+    public float getProgress() {
+      return outer.getProgress();
+    }
   }
 
   private class MapRunner extends Thread {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/map/WrappedMapper.java Mon May 23 17:06:08 2011
@@ -317,5 +317,10 @@ public class WrappedMapper<KEYIN, VALUEI
     public Credentials getCredentials() {
       return mapContext.getCredentials();
     }
+    
+    @Override
+    public float getProgress() {
+      return mapContext.getProgress();
+    }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.java Mon May 23 17:06:08 2011
@@ -471,6 +471,11 @@ public class MultipleOutputs<KEYOUT, VAL
     }
 
     @Override
+    public float getProgress() {
+      return context.getProgress();
+    }
+    
+    @Override
     public void setStatus(String status) {
       context.setStatus(status);
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/reduce/WrappedReducer.java Mon May 23 17:06:08 2011
@@ -321,5 +321,10 @@ public class WrappedReducer<KEYIN, VALUE
     public Credentials getCredentials() {
       return reduceContext.getCredentials();
     }
+    
+    @Override
+    public float getProgress() {
+      return reduceContext.getProgress();
+    }
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java Mon May 23 17:06:08 2011
@@ -107,5 +107,13 @@ public class TaskAttemptContextImpl exte
     public Counter getCounter(String group, String name) {
       return new Counters().findCounter(group, name);
     }
+    public float getProgress() {
+      return 0f;
+    }
+  }
+  
+  @Override
+  public float getProgress() {
+    return reporter.getProgress();
   }
 }
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestMapProgress.java Mon May 23 17:06:08 2011
@@ -94,7 +94,7 @@ public class TestMapProgress extends Tes
       }
       // validate map task progress when the map task is in map phase
       assertTrue("Map progress is not the expected value.",
-                 Math.abs(mapTaskProgress - ((0.667/3)*recordNum)) < 0.001);
+                 Math.abs(mapTaskProgress - ((float)recordNum/3)) < 0.001);
     }
   }
 

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java?rev=1126591&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestReporter.java Mon May 23 17:06:08 2011
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+/**
+ * Tests the old mapred APIs with {@link Reporter#getProgress()}.
+ */
+public class TestReporter {
+  private static final Path rootTempDir =
+    new Path(System.getProperty("test.build.data", "/tmp"));
+  private static final Path testRootTempDir = 
+    new Path(rootTempDir, "TestReporter");
+  
+  private static FileSystem fs = null;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    fs = FileSystem.getLocal(new Configuration());
+    fs.delete(testRootTempDir, true);
+    fs.mkdirs(testRootTempDir);
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    fs.delete(testRootTempDir, true);
+  }
+  
+  // an input with 4 lines
+  private static final String INPUT = "Hi\nHi\nHi\nHi\n";
+  private static final int INPUT_LINES = INPUT.split("\n").length;
+  
+  @SuppressWarnings("deprecation")
+  static class ProgressTesterMapper extends MapReduceBase 
+  implements Mapper<LongWritable, Text, Text, Text> {
+    private float progressRange = 0;
+    private int numRecords = 0;
+    private Reporter reporter = null;
+    
+    @Override
+    public void configure(JobConf job) {
+      super.configure(job);
+      // set the progress range accordingly
+      if (job.getNumReduceTasks() == 0) {
+        progressRange = 1f;
+      } else {
+        progressRange = 0.667f;
+      }
+    }
+    
+    @Override
+    public void map(LongWritable key, Text value, 
+                    OutputCollector<Text, Text> output, Reporter reporter) 
+    throws IOException {
+      this.reporter = reporter;
+      
+      // calculate the actual map progress
+      float mapProgress = ((float)++numRecords)/INPUT_LINES;
+      // calculate the attempt progress based on the progress range
+      float attemptProgress = progressRange * mapProgress;
+      assertEquals("Invalid progress in map", 
+                   attemptProgress, reporter.getProgress(), 0f);
+      output.collect(new Text(value.toString() + numRecords), value);
+    }
+    
+    @Override
+    public void close() throws IOException {
+      super.close();
+      assertEquals("Invalid progress in map cleanup", 
+                   progressRange, reporter.getProgress(), 0f);
+    }
+  }
+  
+  /**
+   * Test {@link Reporter}'s progress for a map-only job.
+   * This will make sure that only the map phase decides the attempt's progress.
+   */
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testReporterProgressForMapOnlyJob() throws IOException {
+    Path test = new Path(testRootTempDir, "testReporterProgressForMapOnlyJob");
+    
+    JobConf conf = new JobConf();
+    conf.setMapperClass(ProgressTesterMapper.class);
+    conf.setMapOutputKeyClass(Text.class);
+    // fail early
+    conf.setMaxMapAttempts(1);
+    conf.setMaxReduceAttempts(0);
+    
+    RunningJob job = 
+      UtilsForTests.runJob(conf, new Path(test, "in"), new Path(test, "out"), 
+                           1, 0, INPUT);
+    job.waitForCompletion();
+    
+    assertTrue("Job failed", job.isSuccessful());
+  }
+  
+  /**
+   * A {@link Reducer} implementation that checks the progress on every call
+   * to {@link Reducer#reduce(Object, Iterator, OutputCollector, Reporter)}.
+   */
+  @SuppressWarnings("deprecation")
+  static class ProgressTestingReducer extends MapReduceBase 
+  implements Reducer<Text, Text, Text, Text> {
+    private int recordCount = 0;
+    private Reporter reporter = null;
+    // reduce task has a fixed split of progress amongst copy, shuffle and 
+    // reduce phases.
+    private final float REDUCE_PROGRESS_RANGE = 1.0f/3;
+    private final float SHUFFLE_PROGRESS_RANGE = 1 - REDUCE_PROGRESS_RANGE;
+    
+    @Override
+    public void configure(JobConf job) {
+      super.configure(job);
+    }
+    
+    @Override
+    public void reduce(Text key, Iterator<Text> values,
+        OutputCollector<Text, Text> output, Reporter reporter)
+    throws IOException {
+      float reducePhaseProgress = ((float)++recordCount)/INPUT_LINES;
+      float weightedReducePhaseProgress = 
+              reducePhaseProgress * REDUCE_PROGRESS_RANGE;
+      assertEquals("Invalid progress in reduce", 
+                   SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress, 
+                   reporter.getProgress(), 0.02f);
+      this.reporter = reporter;
+    }
+    
+    @Override
+    public void close() throws IOException {
+      super.close();
+      assertEquals("Invalid progress in reduce cleanup", 
+                   1.0f, reporter.getProgress(), 0f);
+    }
+  }
+  
+  /**
+   * Test {@link Reporter}'s progress for map-reduce job.
+   */
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testReporterProgressForMRJob() throws IOException {
+    Path test = new Path(testRootTempDir, "testReporterProgressForMRJob");
+    
+    JobConf conf = new JobConf();
+    conf.setMapperClass(ProgressTesterMapper.class);
+    conf.setReducerClass(ProgressTestingReducer.class);
+    conf.setMapOutputKeyClass(Text.class);
+    // fail early
+    conf.setMaxMapAttempts(1);
+    conf.setMaxReduceAttempts(1);
+
+    RunningJob job = 
+      UtilsForTests.runJob(conf, new Path(test, "in"), new Path(test, "out"), 
+                           1, 1, INPUT);
+    job.waitForCompletion();
+    
+    assertTrue("Job failed", job.isSuccessful());
+  }
+}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Mon May 23 17:06:08 2011
@@ -559,6 +559,16 @@ public class UtilsForTests {
   static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
                            int numReds) throws IOException {
 
+    String input = "The quick brown fox\n" + "has many silly\n"
+                   + "red fox sox\n";
+    
+    // submit the job and wait for it to complete
+    return runJob(conf, inDir, outDir, numMaps, numReds, input);
+  }
+  
+  // Start a job with the specified input and return its RunningJob object
+  static RunningJob runJob(JobConf conf, Path inDir, Path outDir, int numMaps, 
+                           int numReds, String input) throws IOException {
     FileSystem fs = FileSystem.get(conf);
     if (fs.exists(outDir)) {
       fs.delete(outDir, true);
@@ -566,8 +576,7 @@ public class UtilsForTests {
     if (!fs.exists(inDir)) {
       fs.mkdirs(inDir);
     }
-    String input = "The quick brown fox\n" + "has many silly\n"
-        + "red fox sox\n";
+    
     for (int i = 0; i < numMaps; ++i) {
       DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
       file.writeBytes(input);

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java Mon May 23 17:06:08 2011
@@ -388,6 +388,10 @@ public class MapReduceTestUtil {
       }
       public void progress() {
       }
+      @Override
+      public float getProgress() {
+        return 0;
+      }
       public Counter getCounter(Enum<?> name) {
         return new Counters().findCounter(name);
       }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java?rev=1126591&r1=1126590&r2=1126591&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestTaskContext.java Mon May 23 17:06:08 2011
@@ -18,16 +18,45 @@
 package org.apache.hadoop.mapreduce;
 
 import java.io.IOException;
+import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil.DataCopyMapper;
+import org.apache.hadoop.mapreduce.MapReduceTestUtil.DataCopyReducer;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 /**
- * Tests context api. 
+ * Tests context api and {@link StatusReporter#getProgress()} via 
+ * {@link TaskAttemptContext#getProgress()} API . 
  */
 public class TestTaskContext extends HadoopTestCase {
+  private static final Path rootTempDir =
+    new Path(System.getProperty("test.build.data", "/tmp"));
+  private static final Path testRootTempDir = 
+    new Path(rootTempDir, "TestTaskContext");
+  
+  private static FileSystem fs = null;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    fs = FileSystem.getLocal(new Configuration());
+    fs.delete(testRootTempDir, true);
+    fs.mkdirs(testRootTempDir);
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    fs.delete(testRootTempDir, true);
+  }
+    
   public TestTaskContext() throws IOException {
     super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 1, 1);
   }
@@ -48,16 +77,188 @@ public class TestTaskContext extends Had
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
+  @Test
   public void testContextStatus()
       throws IOException, InterruptedException, ClassNotFoundException {
+    Path test = new Path(testRootTempDir, "testContextStatus");
+    
+    // test with 1 map and 0 reducers
+    // test with custom task status
     int numMaps = 1;
-    Job job = MapReduceTestUtil.createJob(createJobConf(), new Path("in"),
-        new Path("out"), numMaps, 0);
+    Job job = MapReduceTestUtil.createJob(createJobConf(), 
+                new Path(test, "in"), new Path(test, "out"), numMaps, 0);
     job.setMapperClass(MyMapper.class);
     job.waitForCompletion(true);
     assertTrue("Job failed", job.isSuccessful());
     TaskReport[] reports = job.getTaskReports(TaskType.MAP);
     assertEquals(numMaps, reports.length);
-    assertEquals(myStatus + " > sort", reports[0].getState());
+    assertEquals(myStatus, reports[0].getState());
+    
+    // test with 1 map and 1 reducer
+    // test with default task status
+    int numReduces = 1;
+    job = MapReduceTestUtil.createJob(createJobConf(), 
+            new Path(test, "in"), new Path(test, "out"), numMaps, numReduces);
+    job.setMapperClass(DataCopyMapper.class);
+    job.setReducerClass(DataCopyReducer.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    
+    // fail early
+    job.setMaxMapAttempts(1);
+    job.setMaxReduceAttempts(0);
+    
+    // run the job and wait for completion
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+    
+    // check map task reports
+    reports = job.getTaskReports(TaskType.MAP);
+    assertEquals(numMaps, reports.length);
+    assertEquals("map > sort", reports[0].getState());
+    
+    // check reduce task reports
+    reports = job.getTaskReports(TaskType.REDUCE);
+    assertEquals(numReduces, reports.length);
+    assertEquals("reduce > reduce", reports[0].getState());
+  }
+  
+  // an input with 4 lines
+  private static final String INPUT = "Hi\nHi\nHi\nHi\n";
+  private static final int INPUT_LINES = INPUT.split("\n").length;
+  
+  @SuppressWarnings("unchecked")
+  static class ProgressCheckerMapper 
+  extends Mapper<LongWritable, Text, Text, Text> {
+    private int recordCount = 0;
+    private float progressRange = 0;
+    
+    @Override
+    protected void setup(Context context) throws IOException {
+      // check if the map task attempt progress is 0
+      assertEquals("Invalid progress in map setup", 
+                   0.0f, context.getProgress(), 0f);
+      
+      // define the progress boundaries
+      if (context.getNumReduceTasks() == 0) {
+        progressRange = 1f;
+      } else {
+        progressRange = 0.667f;
+      }
+    }
+    
+    @Override
+    protected void map(LongWritable key, Text value, 
+        org.apache.hadoop.mapreduce.Mapper.Context context) 
+    throws IOException ,InterruptedException {
+      // get the map phase progress
+      float mapPhaseProgress = ((float)++recordCount)/INPUT_LINES;
+      // get the weighted map phase progress
+      float weightedMapProgress = progressRange * mapPhaseProgress;
+      // check the map progress
+      assertEquals("Invalid progress in map", 
+                   weightedMapProgress, context.getProgress(), 0f);
+      
+      context.write(new Text(value.toString() + recordCount), value);
+    };
+    
+    protected void cleanup(Mapper.Context context) 
+    throws IOException, InterruptedException {
+      // check if the attempt progress is at the progress boundary 
+      assertEquals("Invalid progress in map cleanup", 
+                   progressRange, context.getProgress(), 0f);
+    };
+  }
+  
+  /**
+   * Tests new MapReduce map task's context.getProgress() method.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  public void testMapContextProgress()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    int numMaps = 1;
+    
+    Path test = new Path(testRootTempDir, "testMapContextProgress");
+    
+    Job job = MapReduceTestUtil.createJob(createJobConf(), 
+                new Path(test, "in"), new Path(test, "out"), numMaps, 0, INPUT);
+    job.setMapperClass(ProgressCheckerMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    
+    // fail early
+    job.setMaxMapAttempts(1);
+    
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
+  }
+  
+  @SuppressWarnings("unchecked")
+  static class ProgressCheckerReducer extends Reducer<Text, Text, 
+                                                      Text, Text> {
+    private int recordCount = 0;
+    private final float REDUCE_PROGRESS_RANGE = 1.0f/3;
+    private final float SHUFFLE_PROGRESS_RANGE = 1 - REDUCE_PROGRESS_RANGE;
+    
+    protected void setup(final Reducer.Context context) 
+    throws IOException, InterruptedException {
+      // Note that the reduce will read some segments before calling setup()
+      float reducePhaseProgress =  ((float)++recordCount)/INPUT_LINES;
+      float weightedReducePhaseProgress = 
+        REDUCE_PROGRESS_RANGE * reducePhaseProgress;
+      // check that the shuffle phase progress is accounted for
+      assertEquals("Invalid progress in reduce setup",
+                   SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress, 
+                   context.getProgress(), 0.01f);
+    };
+    
+    public void reduce(Text key, Iterator<Text> values, Context context)
+    throws IOException, InterruptedException {
+      float reducePhaseProgress =  ((float)++recordCount)/INPUT_LINES;
+      float weightedReducePhaseProgress = 
+        REDUCE_PROGRESS_RANGE * reducePhaseProgress;
+      assertEquals("Invalid progress in reduce", 
+                   SHUFFLE_PROGRESS_RANGE + weightedReducePhaseProgress, 
+                   context.getProgress(), 0.01f);
+    }
+    
+    protected void cleanup(Reducer.Context context) 
+    throws IOException, InterruptedException {
+      // check if the reduce task has progress of 1 in the end
+      assertEquals("Invalid progress in reduce cleanup", 
+                   1.0f, context.getProgress(), 0f);
+    };
+  }
+  
+  /**
+   * Tests new MapReduce reduce task's context.getProgress() method.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  @Test
+  public void testReduceContextProgress()
+      throws IOException, InterruptedException, ClassNotFoundException {
+    int numTasks = 1;
+    Path test = new Path(testRootTempDir, "testReduceContextProgress");
+    
+    Job job = MapReduceTestUtil.createJob(createJobConf(), 
+                new Path(test, "in"), new Path(test, "out"), numTasks, numTasks,
+                INPUT);
+    job.setMapperClass(ProgressCheckerMapper.class);
+    job.setReducerClass(ProgressCheckerReducer.class);
+    job.setMapOutputKeyClass(Text.class);
+    
+    // fail early
+    job.setMaxMapAttempts(1);
+    job.setMaxReduceAttempts(1);
+    
+    job.waitForCompletion(true);
+    assertTrue("Job failed", job.isSuccessful());
   }
 }