You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/26 04:22:37 UTC

git commit: TEZ-643. Change getProgress APIs to return some form of progress and 1.0f once the map or reduce phase complete. (sseth)

Updated Branches:
  refs/heads/master 3ec17f6d8 -> 8883ce5dc


TEZ-643. Change getProgress APIs to return some form of progress and
1.0f once the map or reduce phase complete. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8883ce5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8883ce5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8883ce5d

Branch: refs/heads/master
Commit: 8883ce5dcabc40bfdfcad75d6ffc97da74322b85
Parents: 3ec17f6
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Nov 25 19:22:00 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Nov 25 19:22:00 2013 -0800

----------------------------------------------------------------------
 .../tez/mapreduce/hadoop/mapred/MRReporter.java      |  9 +++++++--
 .../mapreduce/hadoop/mapreduce/MapContextImpl.java   |  5 +++--
 .../hadoop/mapreduce/TaskAttemptContextImpl.java     | 15 +++++++++------
 .../hadoop/mapreduce/TaskInputOutputContextImpl.java |  5 +++--
 .../java/org/apache/tez/mapreduce/input/MRInput.java |  4 ++--
 .../org/apache/tez/mapreduce/output/MROutput.java    |  2 +-
 .../tez/mapreduce/processor/MRTaskReporter.java      |  3 ++-
 .../tez/mapreduce/processor/map/MapProcessor.java    |  8 +++++++-
 .../mapreduce/processor/reduce/ReduceProcessor.java  |  7 +++++++
 9 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
index e52cc6d..11dcb97 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
@@ -29,6 +29,7 @@ public class MRReporter implements Reporter {
 
   private TezTaskContext tezTaskContext;
   private InputSplit split;
+  private float progress = 0f;
   private boolean isProcessorContext = false;
   
   public MRReporter(TezProcessorContext tezProcContext) {
@@ -85,10 +86,14 @@ public class MRReporter implements Reporter {
     }
   }
 
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+  
   @Override
   public float getProgress() {
-    // TOOD NEWTEZ Does this make a difference to anything ?
-    return 0.0f;
+    // TODO NEWTEZ This is likely broken. Only set on task complete in Map/ReduceProcessor
+    return this.progress;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
index 2d27c4b..0014b1d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -52,8 +53,8 @@ public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
                         RecordWriter<KEYOUT,VALUEOUT> writer,
                         OutputCommitter committer,
                         TezTaskContext context,
-                        InputSplit split) {
-    super(conf, taskid, writer, committer, context);
+                        InputSplit split, Reporter reporter) {
+    super(conf, taskid, writer, committer, context, reporter);
     this.reader = reader;
     this.split = split;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
index be65be7..2f6f90d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.hadoop.mapreduce;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
@@ -35,29 +36,31 @@ import org.apache.tez.runtime.api.TezTaskContext;
 public class TaskAttemptContextImpl
        extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
 
-  private TezTaskContext taskContext;
+  private final TezTaskContext taskContext;
+  private final Reporter reporter;
 
   // FIXME we need to use DAG Id but we are using App Id
   public TaskAttemptContextImpl(Configuration conf,
-      TezTaskContext taskContext, boolean isMap) {
+      TezTaskContext taskContext, boolean isMap, Reporter reporter) {
     // TODO NEWTEZ Can the jt Identifier string be taskContext.getUniqueId ?
     this(conf, new TaskAttemptID(
         new TaskID(String.valueOf(taskContext.getApplicationId()
             .getClusterTimestamp()), taskContext.getApplicationId().getId(),
             isMap ? TaskType.MAP : TaskType.REDUCE,
             taskContext.getTaskIndex()),
-            taskContext.getTaskAttemptNumber()), taskContext);
+            taskContext.getTaskAttemptNumber()), taskContext, reporter);
   }
 
-  public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezTaskContext context) {
+  public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezTaskContext context, Reporter reporter) {
     super(conf, taId);
     this.taskContext = context;
+    this.reporter = reporter;
   }
 
   @Override
   public float getProgress() {
-    // TODO NEWTEZ Will this break anything ?
-    return 0.0f;
+    // TODO NEWTEZ This is broken. Mainly set after all records are processed. Not set for Inputs/Outputs
+    return reporter == null ? 0.0f : reporter.getProgress();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
index 5b5c8ec..9d83435 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -50,8 +51,8 @@ public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
   public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
                                     RecordWriter<KEYOUT,VALUEOUT> output,
                                     OutputCommitter committer,
-                                    TezTaskContext context) {
-    super(conf, taskid, context);
+                                    TezTaskContext context, Reporter reporter) {
+    super(conf, taskid, context, reporter);
     this.output = output;
     this.committer = committer;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 5db41d9..336968c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -100,7 +100,7 @@ public class MRInput implements LogicalInput {
   @SuppressWarnings("rawtypes")
   private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
   protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
-  
+
   @SuppressWarnings("rawtypes")
   private InputFormat oldInputFormat;
   @SuppressWarnings("rawtypes")
@@ -324,7 +324,7 @@ public class MRInput implements LogicalInput {
 
   
   private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
+    return new TaskAttemptContextImpl(this.jobConf, inputContext, true, null);
   }
   
   void processSplitEvent(RootInputDataInformationEvent event)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index fbb155c..b554b6c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -235,7 +235,7 @@ public class MROutput implements LogicalOutput {
 
   private TaskAttemptContext createTaskAttemptContext() {
     return new TaskAttemptContextImpl(this.jobConf, outputContext,
-        isMapperOutput);
+        isMapperOutput, null);
   }
 
   private long getOutputBytes() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
index 74a34af..0869e32 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
@@ -39,7 +39,7 @@ public class MRTaskReporter
 
   private final TezTaskContext context;
   private final boolean isProcessorContext;
-  private final Reporter reporter;
+  private final MRReporter reporter;
 
   private InputSplit split = null;
 
@@ -62,6 +62,7 @@ public class MRTaskReporter
   }
 
   public void setProgress(float progress) {
+    reporter.setProgress(progress);
     if (isProcessorContext) {
       ((TezProcessorContext)context).setProgress(progress);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 801cfcc..164818c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -150,6 +150,9 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
         (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
     runner.run(in, collector, (Reporter)reporter);
+    
+    // Set progress to 1.0f if there was no exception,
+    reporter.setProgress(1.0f);
     // start the sort phase only if there are reducers
     this.statusUpdate();
   }
@@ -192,13 +195,16 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
         job, taskAttemptId,
         input, output,
         getCommitter(),
-        processorContext, split);
+        processorContext, split, reporter);
 
     org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
         new WrappedMapper().getMapContext(mapContext);
 
     input.initialize(split, mapperContext);
     mapper.run(mapperContext);
+    // Set progress to 1.0f if there was no exception,
+    reporter.setProgress(1.0f);
+    
     this.statusUpdate();
     input.close();
     output.close(mapperContext);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 0e41b0b..ecda8c6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -194,6 +194,9 @@ implements LogicalIOProcessor {
         values.informReduceProgress();
       }
 
+      // Set progress to 1.0f if there was no exception,
+      reporter.setProgress(1.0f);
+      
       //Clean up: repeated in catch block below
       reducer.close();
       //End of clean up.
@@ -330,6 +333,10 @@ implements LogicalIOProcessor {
 
 
     reducer.run(reducerContext);
+
+    // Set progress to 1.0f if there was no exception,
+    reporter.setProgress(1.0f);
+
     trackedRW.close(reducerContext);
   }