You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/02/13 03:52:37 UTC
git commit: TEZ-637. [MR Support] Add all required info into JobConf
for MR related I/O/P (bikas)
Updated Branches:
refs/heads/master ac471ada3 -> 924ce6611
TEZ-637. [MR Support] Add all required info into JobConf for MR related I/O/P (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/924ce661
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/924ce661
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/924ce661
Branch: refs/heads/master
Commit: 924ce6611b5e6cfc5be6d6fc2da794ded4304180
Parents: ac471ad
Author: Bikas Saha <bi...@apache.org>
Authored: Wed Feb 12 18:52:31 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed Feb 12 18:52:31 2014 -0800
----------------------------------------------------------------------
.../tez/mapreduce/examples/MRRSleepJob.java | 2 +-
.../mapreduce/examples/OrderedWordCount.java | 2 +-
.../apache/tez/mapreduce/client/YARNRunner.java | 2 +-
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 10 +++-
.../org/apache/tez/mapreduce/input/MRInput.java | 2 +-
.../tez/mapreduce/input/MRInputLegacy.java | 7 +++
.../apache/tez/mapreduce/output/MROutput.java | 2 +-
.../tez/mapreduce/output/MROutputLegacy.java | 28 ++++++++++
.../apache/tez/mapreduce/processor/MRTask.java | 49 +++++++++++------
.../mapreduce/processor/map/MapProcessor.java | 57 ++++++++++++++++----
.../processor/reduce/ReduceProcessor.java | 14 ++---
.../processor/reduce/TestReduceProcessor.java | 9 ++--
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 2 +-
13 files changed, 144 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 4353fb1..5f3b184 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -650,7 +650,7 @@ public class MRRSleepJob extends Configured implements Tool {
Map<String, String> reduceEnv = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
finalReduceVertex.setTaskEnvironment(reduceEnv);
- MRHelpers.addMROutput(finalReduceVertex, reducePayload);
+ MRHelpers.addMROutputLegacy(finalReduceVertex, reducePayload);
vertices.add(finalReduceVertex);
} else {
// Map only job
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 2000668..3b5d573 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -273,7 +273,7 @@ public class OrderedWordCount {
Map<String, String> reduceEnv = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
finalReduceVertex.setTaskEnvironment(reduceEnv);
- MRHelpers.addMROutput(finalReduceVertex, finalReducePayload);
+ MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
vertices.add(finalReduceVertex);
DAG dag = new DAG("OrderedWordCount" + dagIndex);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 2102a1c..49057d3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -414,7 +414,7 @@ public class YARNRunner implements ClientProtocol {
}
// Map only jobs.
if (stageNum == totalStages -1) {
- MRHelpers.addMROutput(vertex, vertexUserPayload);
+ MRHelpers.addMROutputLegacy(vertex, vertexUserPayload);
}
Map<String, String> taskEnv = new HashMap<String, String>();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index eaa597a..a01503c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -74,6 +74,7 @@ import org.apache.tez.mapreduce.combine.MRCombiner;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
@@ -952,7 +953,14 @@ public class MRHelpers {
.setUserPayload(userPayload);
vertex.addOutput("MROutput", od, MROutputCommitter.class);
}
-
+
+ @Private
+ public static void addMROutputLegacy(Vertex vertex, byte[] userPayload) {
+ OutputDescriptor od = new OutputDescriptor(MROutputLegacy.class.getName())
+ .setUserPayload(userPayload);
+ vertex.addOutput("MROutput", od, MROutputCommitter.class);
+ }
+
@SuppressWarnings("unchecked")
public static InputSplit createOldFormatSplitFromUserPayload(
MRSplitProto splitProto, SerializationFactory serializationFactory)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 70003ef..f8f99b7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -105,7 +105,7 @@ public class MRInput implements LogicalInput {
private InputFormat oldInputFormat;
@SuppressWarnings("rawtypes")
protected RecordReader oldRecordReader;
- private InputSplit oldInputSplit;
+ protected InputSplit oldInputSplit;
protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 5c2e515..9419aae 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
@@ -49,6 +50,12 @@ public class MRInputLegacy extends MRInput {
return this.newInputSplit;
}
+
+ @Private
+ public InputSplit getOldInputSplit() {
+ return this.oldInputSplit;
+ }
+
@SuppressWarnings("rawtypes")
@Private
public RecordReader getOldRecordReader() {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index efa18b4..66d4566 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -84,7 +84,7 @@ public class MROutput implements LogicalOutput {
private boolean isMapperOutput;
- private OutputCommitter committer;
+ protected OutputCommitter committer;
@Override
public List<Event> initialize(TezOutputContext outputContext)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
new file mode 100644
index 0000000..7c9f804
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutputLegacy.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.output;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+
+public class MROutputLegacy extends MROutput {
+
+ public OutputCommitter getOutputCommitter() {
+ return committer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 87c7912..242b798 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.RawKeyValueIterator;
@@ -77,7 +79,8 @@ import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
-import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
+import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.library.common.Constants;
@@ -102,6 +105,9 @@ public abstract class MRTask {
protected TaskAttemptID taskAttemptId;
protected Progress progress = new Progress();
protected SecretKey jobTokenSecret;
+
+ LogicalInput input;
+ LogicalOutput output;
boolean isMap;
@@ -334,8 +340,13 @@ public abstract class MRTask {
return this.processorContext;
}
- public void initTask() throws IOException,
+ public void initTask(LogicalOutput output) throws IOException,
InterruptedException {
+ // By this time output has been initialized
+ this.output = output;
+ if (output instanceof MROutputLegacy) {
+ committer = ((MROutputLegacy)output).getOutputCommitter();
+ }
this.mrReporter = new MRTaskReporter(processorContext);
this.useNewApi = jobConf.getUseNewMapper();
TezDAGID dagId = IDConverter.fromMRTaskAttemptId(taskAttemptId).getTaskID()
@@ -366,14 +377,6 @@ public abstract class MRTask {
return null;
}
- public OutputCommitter getCommitter() {
- return committer;
- }
-
- public void setCommitter(OutputCommitter committer) {
- this.committer = committer;
- }
-
public TezCounters getCounters() { return counters; }
public void setConf(JobConf jobConf) {
@@ -415,15 +418,15 @@ public abstract class MRTask {
InterruptedException {
}
- public void done(LogicalOutput output) throws IOException, InterruptedException {
+ public void done() throws IOException, InterruptedException {
updateCounters();
LOG.info("Task:" + taskAttemptId + " is done."
+ " And is in the process of committing");
// TODO change this to use the new context
// TODO TEZ Interaciton between Commit and OutputReady. Merge ?
- if (output instanceof MROutput) {
- MROutput sOut = (MROutput)output;
+ if (output instanceof MROutputLegacy) {
+ MROutputLegacy sOut = (MROutputLegacy)output;
if (sOut.isCommitRequired()) {
//wait for commit approval and commit
// TODO EVENTUALLY - Commit is not required for map tasks.
@@ -456,7 +459,7 @@ public abstract class MRTask {
statusUpdate();
}
- private void commit(MROutput output) throws IOException {
+ private void commit(MROutputLegacy output) throws IOException {
int retries = 3;
while (true) {
// This will loop till the AM asks for the task to be killed. As
@@ -493,7 +496,7 @@ public abstract class MRTask {
}
private
- void discardOutput(MROutput output) {
+ void discardOutput(MROutputLegacy output) {
try {
output.abort();
} catch (IOException ioe) {
@@ -657,7 +660,9 @@ public abstract class MRTask {
statusUpdate();
LOG.info("Runnning cleanup for the task");
// do the cleanup
- committer.abortTask(taskAttemptContext);
+ if (output instanceof MROutputLegacy) {
+ ((MROutputLegacy) output).abort();
+ }
}
public void localizeConfiguration(JobConf jobConf)
@@ -667,6 +672,18 @@ public abstract class MRTask {
jobConf.setInt(JobContext.TASK_PARTITION,
taskAttemptId.getTaskID().getId());
jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+
+ jobConf.setBoolean(MRJobConfig.TASK_ISMAP, isMap);
+
+ Path outputPath = FileOutputFormat.getOutputPath(jobConf);
+ if (outputPath != null) {
+ if ((committer instanceof FileOutputCommitter)) {
+ FileOutputFormat.setWorkOutputPath(jobConf,
+ ((FileOutputCommitter)committer).getTaskAttemptPath(taskAttemptContext));
+ } else {
+ FileOutputFormat.setWorkOutputPath(jobConf, outputPath);
+ }
+ }
}
public abstract TezCounter getOutputRecordsCounter();
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 164818c..00a054e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -25,12 +25,15 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.MapRunnable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.split.TezGroupedSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.util.ReflectionUtils;
@@ -40,7 +43,7 @@ import org.apache.tez.dag.api.TezException;
import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Event;
@@ -89,8 +92,6 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
LOG.info("Running map: " + processorContext.getUniqueIdentifier());
- initTask();
-
if (inputs.size() != 1
|| outputs.size() != 1) {
throw new IOException("Cannot handle multiple inputs or outputs"
@@ -100,10 +101,12 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
LogicalInput in = inputs.values().iterator().next();
LogicalOutput out = outputs.values().iterator().next();
+ initTask(out);
+
// Sanity check
if (!(in instanceof MRInputLegacy)) {
throw new IOException(new TezException(
- "Only Simple Input supported. Input: " + in.getClass()));
+ "Only MRInputLegacy supported. Input: " + in.getClass()));
}
MRInputLegacy input = (MRInputLegacy)in;
input.init();
@@ -115,10 +118,12 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
}
KeyValueWriter kvWriter = null;
- if (!(out instanceof OnFileSortedOutput)) {
- kvWriter = ((MROutput)out).getWriter();
- } else {
+ if ((out instanceof MROutputLegacy)) {
+ kvWriter = ((MROutputLegacy)out).getWriter();
+ } else if ((out instanceof OnFileSortedOutput)){
kvWriter = ((OnFileSortedOutput)out).getWriter();
+ } else {
+ throw new IOException("Illegal output to map: " + in.getClass());
}
if (useNewApi) {
@@ -127,7 +132,35 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
runOldMapper(jobConf, mrReporter, input, kvWriter);
}
- done(out);
+ done();
+ }
+
+
+ /**
+ * Update the job with details about the file split
+ * @param job the job configuration to update
+ * @param inputSplit the file split
+ */
+ private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
+ if (inputSplit instanceof FileSplit) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
+ job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
+ job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
+ }
+ LOG.info("Processing mapred split: " + inputSplit);
+ }
+
+ private void updateJobWithSplit(
+ final JobConf job, org.apache.hadoop.mapreduce.InputSplit inputSplit) {
+ if (inputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+ org.apache.hadoop.mapreduce.lib.input.FileSplit fileSplit =
+ (org.apache.hadoop.mapreduce.lib.input.FileSplit) inputSplit;
+ job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
+ job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
+ job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
+ }
+ LOG.info("Processing mapreduce split: " + inputSplit);
}
void runOldMapper(
@@ -141,6 +174,10 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
// Done only for MRInput.
// TODO use new method in MRInput to get required info
//input.initialize(job, master);
+
+ InputSplit inputSplit = input.getOldInputSplit();
+
+ updateJobWithSplit(job, inputSplit);
RecordReader in = new OldRecordReader(input);
@@ -188,13 +225,15 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor {
new NewOutputCollector(out);
org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
+
+ updateJobWithSplit(job, split);
org.apache.hadoop.mapreduce.MapContext
mapContext =
new MapContextImpl(
job, taskAttemptId,
input, output,
- getCommitter(),
+ committer,
processorContext, split, reporter);
org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index ecda8c6..bf4f660 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Event;
@@ -95,8 +95,6 @@ implements LogicalIOProcessor {
LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
- initTask();
-
if (outputs.size() <= 0 || outputs.size() > 1) {
throw new IOException("Invalid number of outputs"
+ ", outputCount=" + outputs.size());
@@ -110,6 +108,8 @@ implements LogicalIOProcessor {
LogicalInput in = inputs.values().iterator().next();
LogicalOutput out = outputs.values().iterator().next();
+ initTask(out);
+
this.statusUpdate();
Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
@@ -133,12 +133,12 @@ implements LogicalIOProcessor {
KeyValuesReader kvReader = shuffleInput.getReader();
KeyValueWriter kvWriter = null;
- if((out instanceof MROutput)) {
- kvWriter = ((MROutput) out).getWriter();
+ if((out instanceof MROutputLegacy)) {
+ kvWriter = ((MROutputLegacy) out).getWriter();
} else if ((out instanceof OnFileSortedOutput)) {
kvWriter = ((OnFileSortedOutput) out).getWriter();
} else {
- throw new IOException("Illegal input to reduce: " + in.getClass());
+ throw new IOException("Illegal output to reduce: " + in.getClass());
}
if (useNewApi) {
@@ -157,7 +157,7 @@ implements LogicalIOProcessor {
kvReader, comparator, keyClass, valueClass, kvWriter);
}
- done(out);
+ done();
}
void runOldReducer(JobConf job,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 2f2ec16..7868ecc 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -52,6 +52,7 @@ import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.processor.MapUtils;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
@@ -161,9 +162,11 @@ public class TestReduceProcessor {
ProcessorDescriptor reduceProcessorDesc = new ProcessorDescriptor(
ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(reduceConf));
- InputSpec reduceInputSpec = new InputSpec(mapVertexName, new InputDescriptor(LocalMergedInput.class.getName()), 1);
- OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex", new OutputDescriptor(MROutput.class.getName()), 1);
-
+ InputSpec reduceInputSpec = new InputSpec(mapVertexName,
+ new InputDescriptor(LocalMergedInput.class.getName()), 1);
+ OutputSpec reduceOutputSpec = new OutputSpec("NullDestinationVertex",
+ new OutputDescriptor(MROutputLegacy.class.getName()), 1);
+
// Now run a reduce
TaskSpec taskSpec = new TaskSpec(
TezTestUtils.getMockTaskAttemptId(0, 1, 0, 0),
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/924ce661/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 990dbe6..bb71dfe 100644
--- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -371,7 +371,7 @@ public class TestMRRJobsDAGApi {
Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
1, Resource.newInstance(256, 1));
- MRHelpers.addMROutput(stage3Vertex, stage3Payload);
+ MRHelpers.addMROutputLegacy(stage3Vertex, stage3Payload);
Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
Map<String, String> commonEnv = createCommonEnv();