You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/05/07 22:39:03 UTC
git commit: TEZ-103. Configured Combiner is not being used.
Updated Branches:
refs/heads/TEZ-1 b102eb1c6 -> 18f0ebd8e
TEZ-103. Configured Combiner is not being used.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/18f0ebd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/18f0ebd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/18f0ebd8
Branch: refs/heads/TEZ-1
Commit: 18f0ebd8e562a01fce6e39d7a0977ce2f8b9d3a1
Parents: b102eb1
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon May 6 18:16:20 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue May 7 13:38:41 2013 -0700
----------------------------------------------------------------------
.../tez/engine/common/combine/CombineInput.java | 6 +-
.../engine/common/shuffle/impl/MergeManager.java | 55 ++-
.../engine/common/sort/impl/ExternalSorter.java | 15 +-
.../apache/tez/engine/common/sort/impl/IFile.java | 5 +
.../engine/common/sort/impl/IFileOutputStream.java | 3 +
.../common/sort/impl/dflt/DefaultSorter.java | 12 +-
.../task/local/output/TezTaskOutputFiles.java | 11 +-
.../apache/tez/mapreduce/combine/MRCombiner.java | 348 +++++++++++++++
.../org/apache/tez/mapreduce/processor/MRTask.java | 3 +
9 files changed, 423 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
index 707e54c..bf504bb 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
@@ -121,7 +121,11 @@ public class CombineInput implements Input {
public void close() throws IOException {
input.close();
}
-
+
+ public TezRawKeyValueIterator getIterator() {
+ return this.input;
+ }
+
protected class ValueIterator implements Iterator<Object> {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index 55860ea..9156f28 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -373,12 +373,17 @@ public class MergeManager {
CombineOutput combineOut = new CombineOutput(writer);
combineOut.initialize(conf, reporter);
-
- combineProcessor.process(new Input[] {combineIn},
- new Output[] {combineOut});
-
- combineIn.close();
- combineOut.close();
+
+ try {
+ combineProcessor.process(new Input[] {combineIn},
+ new Output[] {combineOut});
+ } catch (IOException ioe) {
+ try {
+ combineProcessor.close();
+ } catch (IOException ignoredException) {}
+
+ throw ioe;
+ }
}
@@ -471,31 +476,33 @@ public class MergeManager {
mergeOutputSize).suffix(
Constants.MERGED_OUTPUT_PREFIX);
- Writer writer =
- new Writer(conf, rfs, outputPath,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
- codec, null);
-
- TezRawKeyValueIterator rIter = null;
+ Writer writer = null;
try {
+ writer =
+ new Writer(conf, rfs, outputPath,
+ (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+ (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+ codec, null);
+
+ TezRawKeyValueIterator rIter = null;
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
- " segments...");
-
+ " segments...");
+
rIter = TezMerger.merge(conf, rfs,
- (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
- (Class)ConfigUtils.getIntermediateInputValueClass(conf),
- inMemorySegments, inMemorySegments.size(),
- new Path(taskAttemptId.toString()),
- (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
- reporter, spilledRecordsCounter, null, null);
-
+ (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+ (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+ inMemorySegments, inMemorySegments.size(),
+ new Path(taskAttemptId.toString()),
+ (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
+ reporter, spilledRecordsCounter, null, null);
+
if (null == combineProcessor) {
TezMerger.writeFile(rIter, writer, reporter, conf);
} else {
runCombineProcessor(rIter, writer);
}
writer.close();
+ writer = null;
LOG.info(taskAttemptId +
" Merge of the " + noInMemorySegments +
@@ -507,6 +514,10 @@ public class MergeManager {
//earlier when we invoked cloneFileAttributes
localFS.delete(outputPath, true);
throw e;
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
}
// Note the output of the merge
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 6d07a1c..f6af426 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -193,11 +193,18 @@ public abstract class ExternalSorter {
CombineOutput combineOut = new CombineOutput(writer);
combineOut.initialize(job, runningTaskContext.getTaskReporter());
- combineProcessor.process(new Input[] {combineIn},
- new Output[] {combineOut});
+ try {
+ combineProcessor.process(new Input[] {combineIn},
+ new Output[] {combineOut});
+ } catch (IOException ioe) {
+ try {
+ combineProcessor.close();
+ } catch (IOException ignored) {}
- combineIn.close();
- combineOut.close();
+ // Do not close output here as the sorter should close the combine output
+
+ throw ioe;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
index 161cc5a..db59a13 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFile.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -73,6 +74,7 @@ public class IFile {
boolean ownOutputStream = false;
long start = 0;
FSDataOutputStream rawOut;
+ AtomicBoolean closed = new AtomicBoolean(false);
CompressionOutputStream compressedOut;
Compressor compressor;
@@ -153,6 +155,9 @@ public class IFile {
}
public void close() throws IOException {
+ if (closed.getAndSet(true)) {
+ throw new IOException("Writer was already closed earlier");
+ }
// When IFile writer is created by BackupStore, we do not have
// Key and Value classes set. So, check before closing the
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
index 45cf917..75fcd68 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
@@ -101,12 +101,15 @@ public class IFileOutputStream extends FilterOutputStream {
sum.update(buffer, 0, offset);
offset = 0;
}
+ /*
+ // FIXME if needed re-enable this in debug mode
if (LOG.isDebugEnabled()) {
LOG.debug("XXX checksum" +
" b=" + b + " off=" + off +
" buffer=" + " offset=" + offset +
" len=" + len);
}
+ */
/* now we should have len < buffer.length */
System.arraycopy(b, off, buffer, offset, len);
offset += len;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
index 75c7de3..0c35760 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
@@ -673,6 +673,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
spillLock.unlock();
sortAndSpill();
} catch (Throwable t) {
+ LOG.warn("ZZZZ: Got an exception in sortAndSpill", t);
sortSpillException = t;
} finally {
spillLock.lock();
@@ -794,6 +795,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
if (spstart != spindex) {
TezRawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
+ LOG.info("DEBUG: Running combine processor");
runCombineProcessor(kvIter, writer);
}
}
@@ -1052,7 +1054,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
sortPhase.complete();
return;
}
- {
+ else {
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
TezMerger.considerFinalMergeForProgress();
@@ -1096,16 +1098,16 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
long segmentStart = finalOut.getPos();
Writer writer =
new Writer(job, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter);
+ spilledRecordsCounter);
if (combineProcessor == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job);
- writer.close();
+ TezMerger.writeFile(kvIter, writer,
+ runningTaskContext.getTaskReporter(), job);
} else {
runCombineProcessor(kvIter, writer);
}
+ writer.close();
sortPhase.startNextPhase();
-
// record offsets
final TezIndexRecord rec =
new TezIndexRecord(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
index c925367..eeca130 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
@@ -20,6 +20,8 @@ package org.apache.tez.engine.common.task.local.output;
import java.io.IOException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +44,7 @@ import org.apache.tez.engine.records.TezTaskID;
@InterfaceStability.Unstable
public class TezTaskOutputFiles extends TezTaskOutput {
+ private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
private Configuration conf;
private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
@@ -56,9 +59,11 @@ public class TezTaskOutputFiles extends TezTaskOutput {
new LocalDirAllocator(TezJobConfig.LOCAL_DIR);
private Path getAttemptOutputDir() {
- System.err.println("getAttemptOutputDir: " +
- Constants.TASK_OUTPUT_DIR + "/" + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
- return new Path(Constants.TASK_OUTPUT_DIR, conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+ LOG.info("DEBUG: getAttemptOutputDir: "
+ + Constants.TASK_OUTPUT_DIR + "/"
+ + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+ return new Path(Constants.TASK_OUTPUT_DIR,
+ conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
new file mode 100644
index 0000000..42eddee
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.combine;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
+import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.engine.common.combine.CombineOutput;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+public class MRCombiner implements Processor {
+
+ private static Log LOG = LogFactory.getLog(MRCombiner.class);
+
+ JobConf jobConf;
+ boolean useNewApi;
+
+ private final MRTask task;
+
+ private Counter combinerInputKeyCounter;
+ private Counter combinerInputValueCounter;
+ private Progress combinePhase;
+
+ public MRCombiner(MRTask task) {
+ this.task = task;
+ }
+
+ @Override
+ public void initialize(Configuration conf, Master master) throws IOException,
+ InterruptedException {
+ if (conf instanceof JobConf) {
+ jobConf = (JobConf)conf;
+ } else {
+ jobConf = new JobConf(conf);
+ }
+ useNewApi = jobConf.getUseNewMapper();
+ }
+
+ @Override
+ public void process(Input[] in, Output[] out) throws IOException,
+ InterruptedException {
+ LOG.info("DEBUG: Running MRCombiner"
+ + ", usingNewAPI=" + useNewApi);
+
+ CombineInput input = (CombineInput)in[0];
+ CombineOutput output = (CombineOutput)out[0];
+
+ combinePhase = task.getProgress().addPhase("combine");
+
+ Class<?> keyClass = ConfigUtils.getIntermediateOutputKeyClass(jobConf);
+ Class<?> valueClass = ConfigUtils.getIntermediateOutputValueClass(jobConf);
+ LOG.info("Using combineKeyClass: " + keyClass);
+ LOG.info("Using combineValueClass: " + valueClass);
+ RawComparator<?> comparator =
+ ConfigUtils.getIntermediateOutputKeyComparator(jobConf);
+ LOG.info("Using combineComparator: " + comparator);
+
+ combinerInputKeyCounter =
+ task.getMRReporter().getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+ combinerInputValueCounter =
+ task.getMRReporter().getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
+
+ if (useNewApi) {
+ try {
+ runNewCombiner(this.jobConf,
+ task.getUmbilical(),
+ task.getMRReporter(),
+ input, comparator, keyClass, valueClass, output);
+ } catch (ClassNotFoundException cnfe) {
+ throw new IOException(cnfe);
+ }
+ } else {
+ runOldCombiner(this.jobConf,
+ task.getUmbilical(),
+ task.getMRReporter(),
+ input,
+ comparator, keyClass, valueClass,
+ output);
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void runOldCombiner(JobConf job,
+ TezTaskUmbilicalProtocol umbilical,
+ final MRTaskReporter reporter,
+ CombineInput input,
+ RawComparator comparator,
+ Class keyClass,
+ Class valueClass,
+ final Output output) throws IOException, InterruptedException {
+
+ Reducer combiner =
+ ReflectionUtils.newInstance(job.getCombinerClass(), job);
+
+ // make output collector
+
+ OutputCollector collector =
+ new OutputCollector() {
+ public void collect(Object key, Object value)
+ throws IOException {
+ try {
+ output.write(key, value);
+ } catch (InterruptedException ie) {
+ throw new IOException(ie);
+ }
+ }
+ };
+
+ // apply combiner function
+ CombinerValuesIterator values =
+ new CombinerValuesIterator(input,
+ comparator, keyClass, valueClass, job, reporter,
+ combinerInputValueCounter, combinePhase);
+
+ values.informReduceProgress();
+ while (values.more()) {
+ combinerInputKeyCounter.increment(1);
+ combiner.reduce(values.getKey(), values, collector, reporter);
+ values.nextKey();
+ values.informReduceProgress();
+ }
+ }
+
+ private static final class CombinerValuesIterator<KEY,VALUE>
+ extends org.apache.tez.engine.common.task.impl.ValuesIterator<KEY,VALUE> {
+ private Counter combineInputValueCounter;
+ private Progress combinePhase;
+
+ public CombinerValuesIterator (CombineInput in,
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass,
+ Configuration conf, Progressable reporter,
+ Counter combineInputValueCounter,
+ Progress combinePhase)
+ throws IOException {
+ super(in.getIterator(), comparator, keyClass, valClass, conf, reporter);
+ this.combineInputValueCounter = combineInputValueCounter;
+ this.combinePhase = combinePhase;
+ }
+
+ @Override
+ public VALUE next() {
+ combineInputValueCounter.increment(1);
+ return moveToNext();
+ }
+
+ protected VALUE moveToNext() {
+ return super.next();
+ }
+
+ public void informReduceProgress() {
+ combinePhase.set(super.in.getProgress().getProgress()); // update progress
+ reporter.progress();
+ }
+ }
+
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private void runNewCombiner(JobConf job,
+ final TezTaskUmbilicalProtocol umbilical,
+ final MRTaskReporter reporter,
+ CombineInput input,
+ RawComparator comparator,
+ Class keyClass,
+ Class valueClass,
+ final Output out
+ ) throws IOException,InterruptedException,
+ ClassNotFoundException {
+ // wrap value iterator to report progress.
+ final TezRawKeyValueIterator rawIter = input.getIterator();
+ TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
+ public void close() throws IOException {
+ rawIter.close();
+ }
+ public DataInputBuffer getKey() throws IOException {
+ return rawIter.getKey();
+ }
+ public Progress getProgress() {
+ return rawIter.getProgress();
+ }
+ public DataInputBuffer getValue() throws IOException {
+ return rawIter.getValue();
+ }
+ public boolean next() throws IOException {
+ boolean ret = rawIter.next();
+ // FIXME progress updates for combiner
+ // reporter.setProgress(rawIter.getProgress().getProgress());
+ return ret;
+ }
+ };
+
+ // make a task context so we can get the classes
+ org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+ new TaskAttemptContextImpl(job, task.getTaskAttemptId(), reporter);
+
+ // make a reducer
+ org.apache.hadoop.mapreduce.Reducer reducer =
+ (org.apache.hadoop.mapreduce.Reducer)
+ ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
+
+ org.apache.hadoop.mapreduce.RecordWriter trackedRW =
+ new org.apache.hadoop.mapreduce.RecordWriter() {
+
+ @Override
+ public void write(Object key, Object value) throws IOException,
+ InterruptedException {
+ out.write(key, value);
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException,
+ InterruptedException {
+ // Should not close this here as the sorter will close the
+ // combine output
+ }
+ };
+
+ org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
+ createReduceContext(
+ reducer, job, task.getTaskAttemptId(),
+ rIter, combinerInputKeyCounter,
+ combinerInputValueCounter,
+ trackedRW,
+ null,
+ reporter, comparator, keyClass,
+ valueClass);
+ reducer.run(reducerContext);
+ trackedRW.close(reducerContext);
+ }
+
+ @Override
+ public void close() throws IOException, InterruptedException {
+ }
+
+ protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
+ org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+ createReduceContext(org.apache.hadoop.mapreduce.Reducer
+ <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
+ Configuration job,
+ TezTaskAttemptID taskId,
+ final TezRawKeyValueIterator rIter,
+ org.apache.hadoop.mapreduce.Counter inputKeyCounter,
+ org.apache.hadoop.mapreduce.Counter inputValueCounter,
+ org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
+ org.apache.hadoop.mapreduce.OutputCommitter committer,
+ org.apache.hadoop.mapreduce.StatusReporter reporter,
+ RawComparator<INKEY> comparator,
+ Class<INKEY> keyClass, Class<INVALUE> valueClass
+ ) throws IOException, InterruptedException {
+ RawKeyValueIterator r =
+ new RawKeyValueIterator() {
+
+ @Override
+ public boolean next() throws IOException {
+ return rIter.next();
+ }
+
+ @Override
+ public DataInputBuffer getValue() throws IOException {
+ return rIter.getValue();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return rIter.getProgress();
+ }
+
+ @Override
+ public DataInputBuffer getKey() throws IOException {
+ return rIter.getKey();
+ }
+
+ @Override
+ public void close() throws IOException {
+ rIter.close();
+ }
+ };
+ org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
+ reduceContext =
+ new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
+ job,
+ IDConverter.toMRTaskAttemptId(taskId),
+ r,
+ inputKeyCounter,
+ inputValueCounter,
+ output,
+ committer,
+ reporter,
+ comparator,
+ keyClass,
+ valueClass);
+ LOG.info("DEBUG: Using combineKeyClass: "
+ + keyClass + ", combineValueClass: " + valueClass);
+
+ org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
+ reducerContext = new
+ WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
+ reduceContext);
+
+ return reducerContext;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/18f0ebd8/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 17fab1b..d17e477 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -68,6 +68,7 @@ import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.engine.records.OutputContext;
import org.apache.tez.engine.records.TezDAGID;
import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.mapreduce.combine.MRCombiner;
import org.apache.tez.mapreduce.hadoop.IDConverter;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRTaskStatus;
@@ -216,6 +217,8 @@ extends RunningTaskContext {
partitioner = new MRPartitioner(this);
((MRPartitioner)partitioner).initialize(job, getTaskReporter());
+ combineProcessor = new MRCombiner(this);
+ combineProcessor.initialize(job, getTaskReporter());
localizeConfiguration(jobConf);
}