You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/11/26 04:22:37 UTC
git commit: TEZ-643. Change getProgress APIs to return some form of
progress and 1.0f once the map or reduce phase complete. (sseth)
Updated Branches:
refs/heads/master 3ec17f6d8 -> 8883ce5dc
TEZ-643. Change getProgress APIs to return some form of progress and
1.0f once the map or reduce phase complete. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8883ce5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8883ce5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8883ce5d
Branch: refs/heads/master
Commit: 8883ce5dcabc40bfdfcad75d6ffc97da74322b85
Parents: 3ec17f6
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Nov 25 19:22:00 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Nov 25 19:22:00 2013 -0800
----------------------------------------------------------------------
.../tez/mapreduce/hadoop/mapred/MRReporter.java | 9 +++++++--
.../mapreduce/hadoop/mapreduce/MapContextImpl.java | 5 +++--
.../hadoop/mapreduce/TaskAttemptContextImpl.java | 15 +++++++++------
.../hadoop/mapreduce/TaskInputOutputContextImpl.java | 5 +++--
.../java/org/apache/tez/mapreduce/input/MRInput.java | 4 ++--
.../org/apache/tez/mapreduce/output/MROutput.java | 2 +-
.../tez/mapreduce/processor/MRTaskReporter.java | 3 ++-
.../tez/mapreduce/processor/map/MapProcessor.java | 8 +++++++-
.../mapreduce/processor/reduce/ReduceProcessor.java | 7 +++++++
9 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
index e52cc6d..11dcb97 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
@@ -29,6 +29,7 @@ public class MRReporter implements Reporter {
private TezTaskContext tezTaskContext;
private InputSplit split;
+ private float progress = 0f;
private boolean isProcessorContext = false;
public MRReporter(TezProcessorContext tezProcContext) {
@@ -85,10 +86,14 @@ public class MRReporter implements Reporter {
}
}
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
@Override
public float getProgress() {
- // TOOD NEWTEZ Does this make a difference to anything ?
- return 0.0f;
+ // TODO NEWTEZ This is likely broken. Only set on task complete in Map/ReduceProcessor
+ return this.progress;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
index 2d27c4b..0014b1d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.MapContext;
import org.apache.hadoop.mapreduce.Mapper;
@@ -52,8 +53,8 @@ public class MapContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
TezTaskContext context,
- InputSplit split) {
- super(conf, taskid, writer, committer, context);
+ InputSplit split, Reporter reporter) {
+ super(conf, taskid, writer, committer, context, reporter);
this.reader = reader;
this.split = split;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
index be65be7..2f6f90d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.hadoop.mapreduce;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
@@ -35,29 +36,31 @@ import org.apache.tez.runtime.api.TezTaskContext;
public class TaskAttemptContextImpl
extends org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl {
- private TezTaskContext taskContext;
+ private final TezTaskContext taskContext;
+ private final Reporter reporter;
// FIXME we need to use DAG Id but we are using App Id
public TaskAttemptContextImpl(Configuration conf,
- TezTaskContext taskContext, boolean isMap) {
+ TezTaskContext taskContext, boolean isMap, Reporter reporter) {
// TODO NEWTEZ Can the jt Identifier string be taskContext.getUniqueId ?
this(conf, new TaskAttemptID(
new TaskID(String.valueOf(taskContext.getApplicationId()
.getClusterTimestamp()), taskContext.getApplicationId().getId(),
isMap ? TaskType.MAP : TaskType.REDUCE,
taskContext.getTaskIndex()),
- taskContext.getTaskAttemptNumber()), taskContext);
+ taskContext.getTaskAttemptNumber()), taskContext, reporter);
}
- public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezTaskContext context) {
+ public TaskAttemptContextImpl(Configuration conf, TaskAttemptID taId, TezTaskContext context, Reporter reporter) {
super(conf, taId);
this.taskContext = context;
+ this.reporter = reporter;
}
@Override
public float getProgress() {
- // TODO NEWTEZ Will this break anything ?
- return 0.0f;
+ // TODO NEWTEZ This is broken. Mainly set after all records are processed. Not set for Inputs/Outputs
+ return reporter == null ? 0.0f : reporter.getProgress();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
index 5b5c8ec..9d83435 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -50,8 +51,8 @@ public abstract class TaskInputOutputContextImpl<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
- TezTaskContext context) {
- super(conf, taskid, context);
+ TezTaskContext context, Reporter reporter) {
+ super(conf, taskid, context, reporter);
this.output = output;
this.committer = committer;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 5db41d9..336968c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -100,7 +100,7 @@ public class MRInput implements LogicalInput {
@SuppressWarnings("rawtypes")
private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
-
+
@SuppressWarnings("rawtypes")
private InputFormat oldInputFormat;
@SuppressWarnings("rawtypes")
@@ -324,7 +324,7 @@ public class MRInput implements LogicalInput {
private TaskAttemptContext createTaskAttemptContext() {
- return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
+ return new TaskAttemptContextImpl(this.jobConf, inputContext, true, null);
}
void processSplitEvent(RootInputDataInformationEvent event)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index fbb155c..b554b6c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -235,7 +235,7 @@ public class MROutput implements LogicalOutput {
private TaskAttemptContext createTaskAttemptContext() {
return new TaskAttemptContextImpl(this.jobConf, outputContext,
- isMapperOutput);
+ isMapperOutput, null);
}
private long getOutputBytes() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
index 74a34af..0869e32 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
@@ -39,7 +39,7 @@ public class MRTaskReporter
private final TezTaskContext context;
private final boolean isProcessorContext;
- private final Reporter reporter;
+ private final MRReporter reporter;
private InputSplit split = null;
@@ -62,6 +62,7 @@ public class MRTaskReporter
}
public void setProgress(float progress) {
+ reporter.setProgress(progress);
if (isProcessorContext) {
((TezProcessorContext)context).setProgress(progress);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 801cfcc..164818c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -150,6 +150,9 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
(MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
runner.run(in, collector, (Reporter)reporter);
+
+ // Set progress to 1.0f if there was no exception,
+ reporter.setProgress(1.0f);
// start the sort phase only if there are reducers
this.statusUpdate();
}
@@ -192,13 +195,16 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
job, taskAttemptId,
input, output,
getCommitter(),
- processorContext, split);
+ processorContext, split, reporter);
org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
new WrappedMapper().getMapContext(mapContext);
input.initialize(split, mapperContext);
mapper.run(mapperContext);
+ // Set progress to 1.0f if there was no exception,
+ reporter.setProgress(1.0f);
+
this.statusUpdate();
input.close();
output.close(mapperContext);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8883ce5d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 0e41b0b..ecda8c6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -194,6 +194,9 @@ implements LogicalIOProcessor {
values.informReduceProgress();
}
+ // Set progress to 1.0f if there was no exception,
+ reporter.setProgress(1.0f);
+
//Clean up: repeated in catch block below
reducer.close();
//End of clean up.
@@ -330,6 +333,10 @@ implements LogicalIOProcessor {
reducer.run(reducerContext);
+
+ // Set progress to 1.0f if there was no exception,
+ reporter.setProgress(1.0f);
+
trackedRW.close(reducerContext);
}