You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/25 01:33:24 UTC
svn commit: r1471778 - in /incubator/tez/branches/TEZ-1:
tez-common/src/main/java/org/apache/tez/common/
tez-engine/src/main/java/org/apache/tez/engine/common/
tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/
tez-engine/src/main/java...
Author: sseth
Date: Wed Apr 24 23:33:22 2013
New Revision: 1471778
URL: http://svn.apache.org/r1471778
Log:
TEZ-79. Remove mapreduce config references from TezEngine. Added explicit configs for parameters which were earlier implicit across stages. (sseth)
Modified:
incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
Modified: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java (original)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java Wed Apr 24 23:33:22 2013
@@ -250,6 +250,39 @@ public class TezJobConfig {
public static final String TEZ_ENGINE_INPUT_BUFFER_PERCENT =
"tez.engine.task.input.buffer.percent";
public static final float DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT = 0.0f;
+
+ // TODO Rename.
+ public static final String TEZ_ENGINE_GROUP_COMPARATOR_CLASS =
+ "tez.engine.group.comparator.class";
+
+ // TODO Better name.
+ public static final String TEZ_ENGINE_INTERNAL_SORTER_CLASS =
+ "tez.engine.internal.sorter.class";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS =
+ "tez.engine.intermediate-output.key.comparator.class";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS =
+ "tez.engine.intermediate-input.key.comparator.class";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS =
+ "tez.engine.intermediate-output.key.class";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS =
+ "tez.engine.intermediate-input.key.class";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS =
+ "tez.engine.intermediate-output.value.class";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS =
+ "tez.engine.intermediate-input.value.class";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS =
+ "tez.engine.intermediate-output.should-compress";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED =
+ "tez.engine.intermdiate-input.is-compressed";
+
+ public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC =
+ "tez.engine.intermediate-output.compress.codec";
+ public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC =
+ "tez.engine.intermediate-input.compress.codec";
// TODO This should be in DAGConfiguration
/* config for tracking the local file where all the credentials for the job
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java Wed Apr 24 23:33:22 2013
@@ -27,24 +27,73 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
@SuppressWarnings({"unchecked", "rawtypes"})
public class ConfigUtils {
- public static Class<? extends CompressionCodec> getMapOutputCompressorClass(
- Configuration conf, Class<DefaultCodec> class1) {
- // TODO Auto-generated method stub
- return null;
+
+ public static Class<? extends CompressionCodec> getIntermediateOutputCompressorClass(
+ Configuration conf, Class<DefaultCodec> defaultValue) {
+ Class<? extends CompressionCodec> codecClass = defaultValue;
+ String name = conf
+ .get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
+ if (name != null) {
+ try {
+ codecClass = conf.getClassByName(name).asSubclass(
+ CompressionCodec.class);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Compression codec " + name
+ + " was not found.", e);
+ }
+ }
+ return codecClass;
+ }
+
+ public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
+ Configuration conf, Class<DefaultCodec> defaultValue) {
+ Class<? extends CompressionCodec> codecClass = defaultValue;
+ String name = conf
+ .get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC);
+ if (name != null) {
+ try {
+ codecClass = conf.getClassByName(name).asSubclass(
+ CompressionCodec.class);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalArgumentException("Compression codec " + name
+ + " was not found.", e);
+ }
+ }
+ return codecClass;
}
- public static boolean getCompressMapOutput(Configuration conf) {
- // TODO Auto-generated method stub
- return false;
+
+ // TODO Move defaults over to a constants file.
+
+ public static boolean shouldCompressIntermediateOutput(Configuration conf) {
+ return conf.getBoolean(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
}
- public static <V> Class<V> getMapOutputValueClass(Configuration conf) {
- Class<V> retv =
- (Class<V>)
- conf.getClass("mapreduce.map.output.value.class", null, Object.class);
+ public static boolean isIntermediateInputCompressed(Configuration conf) {
+ return conf.getBoolean(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
+ }
+
+ // TODO Is it possible to simplify the 3-level lookup (Comparator, Map-key, Job-key)
+ public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
+ Class<V> retv = (Class<V>) conf.getClass(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, null,
+ Object.class);
+ if (retv == null) {
+ retv = getOutputValueClass(conf);
+ }
+ return retv;
+ }
+
+ public static <V> Class<V> getIntermediateInputValueClass(Configuration conf) {
+ Class<V> retv = (Class<V>) conf.getClass(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS, null,
+ Object.class);
if (retv == null) {
retv = getOutputValueClass(conf);
}
@@ -56,16 +105,26 @@ public class ConfigUtils {
"mapreduce.job.output.value.class", Text.class, Object.class);
}
- public static <K> Class<K> getMapOutputKeyClass(Configuration conf) {
- Class<K> retv =
- (Class<K>) conf.getClass("mapreduce.map.output.key.class", null, Object.class);
+ public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) {
+ Class<K> retv = (Class<K>) conf.getClass(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, null,
+ Object.class);
if (retv == null) {
retv = getOutputKeyClass(conf);
}
- return
- retv;
+ return retv;
}
+ public static <K> Class<K> getIntermediateInputKeyClass(Configuration conf) {
+ Class<K> retv = (Class<K>) conf.getClass(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS, null,
+ Object.class);
+ if (retv == null) {
+ retv = getOutputKeyClass(conf);
+ }
+ return retv;
+ }
+
public static <K> Class<K> getOutputKeyClass(Configuration conf) {
return
(Class<K>)
@@ -74,17 +133,28 @@ public class ConfigUtils {
LongWritable.class, Object.class);
}
- public static <K> RawComparator<K> getOutputKeyComparator(Configuration conf) {
- Class<? extends RawComparator> theClass =
- conf.getClass(
- "mapreduce.job.output.key.comparator.class", null,
- RawComparator.class);
- if (theClass != null)
- return ReflectionUtils.newInstance(theClass, conf);
- return WritableComparator.get(
- getMapOutputKeyClass(conf).asSubclass(WritableComparable.class));
- }
+ public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
+ Class<? extends RawComparator> theClass = conf.getClass(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null,
+ RawComparator.class);
+ if (theClass != null)
+ return ReflectionUtils.newInstance(theClass, conf);
+ return WritableComparator.get(getIntermediateOutputKeyClass(conf).asSubclass(
+ WritableComparable.class));
+ }
+
+ public static <K> RawComparator<K> getIntermediateInputKeyComparator(Configuration conf) {
+ Class<? extends RawComparator> theClass = conf.getClass(
+ TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, null,
+ RawComparator.class);
+ if (theClass != null)
+ return ReflectionUtils.newInstance(theClass, conf);
+ return WritableComparator.get(getIntermediateInputKeyClass(conf).asSubclass(
+ WritableComparable.class));
+ }
+
+
public static <V> RawComparator<V> getOutputValueGroupingComparator(
Configuration conf) {
Class<? extends RawComparator> theClass =
@@ -92,7 +162,7 @@ public class ConfigUtils {
"mapreduce.job.output.group.comparator.class",
null, RawComparator.class);
if (theClass == null) {
- return getOutputKeyComparator(conf);
+ return getIntermediateOutputKeyComparator(conf);
}
return ReflectionUtils.newInstance(theClass, conf);
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java Wed Apr 24 23:33:22 2013
@@ -68,9 +68,9 @@ public class LocalShuffle {
this.taskContext = taskContext;
this.runningTaskContext = runningTaskContext;
this.conf = conf;
- this.keyClass = ConfigUtils.getMapOutputKeyClass(conf);
- this.valClass = ConfigUtils.getMapOutputValueClass(conf);
- this.comparator = ConfigUtils.getOutputKeyComparator(conf);
+ this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+ this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+ this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
this.sortFactor =
conf.getInt(
@@ -83,9 +83,9 @@ public class LocalShuffle {
reporter.getCounter(TaskCounter.SPILLED_RECORDS);
// compression
- if (ConfigUtils.getCompressMapOutput(conf)) {
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getMapOutputCompressorClass(conf, DefaultCodec.class);
+ ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
this.codec = ReflectionUtils.newInstance(codecClass, conf);
} else {
this.codec = null;
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java Wed Apr 24 23:33:22 2013
@@ -124,9 +124,9 @@ class Fetcher extends Thread {
wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.WRONG_REDUCE.toString());
- if (ConfigUtils.getCompressMapOutput(job)) {
+ if (ConfigUtils.isIntermediateInputCompressed(job)) {
Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getMapOutputCompressorClass(job, DefaultCodec.class);
+ ConfigUtils.getIntermediateInputCompressorClass(job, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
decompressor = CodecPool.getDecompressor(codec);
} else {
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java Wed Apr 24 23:33:22 2013
@@ -142,9 +142,9 @@ public class MergeManager {
this.localFS = localFS;
this.rfs = ((LocalFileSystem)localFS).getRaw();
- if (ConfigUtils.getCompressMapOutput(conf)) {
+ if (ConfigUtils.isIntermediateInputCompressed(conf)) {
Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getMapOutputCompressorClass(conf, DefaultCodec.class);
+ ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, conf);
} else {
codec = null;
@@ -416,11 +416,11 @@ public class MergeManager {
TezRawKeyValueIterator rIter =
TezMerger.merge(conf, rfs,
- ConfigUtils.getMapOutputKeyClass(conf),
- ConfigUtils.getMapOutputValueClass(conf),
+ ConfigUtils.getIntermediateInputKeyClass(conf),
+ ConfigUtils.getIntermediateInputValueClass(conf),
inMemorySegments, inMemorySegments.size(),
new Path(taskAttemptId.toString()),
- (RawComparator)ConfigUtils.getOutputKeyComparator(conf),
+ (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
reporter, null, null, null);
TezMerger.writeFile(rIter, writer, reporter, conf);
writer.close();
@@ -473,8 +473,8 @@ public class MergeManager {
Writer writer =
new Writer(conf, rfs, outputPath,
- (Class)ConfigUtils.getMapOutputKeyClass(conf),
- (Class)ConfigUtils.getMapOutputValueClass(conf),
+ (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+ (Class)ConfigUtils.getIntermediateInputValueClass(conf),
codec, null);
TezRawKeyValueIterator rIter = null;
@@ -483,11 +483,11 @@ public class MergeManager {
" segments...");
rIter = TezMerger.merge(conf, rfs,
- (Class)ConfigUtils.getMapOutputKeyClass(conf),
- (Class)ConfigUtils.getMapOutputValueClass(conf),
+ (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+ (Class)ConfigUtils.getIntermediateInputValueClass(conf),
inMemorySegments, inMemorySegments.size(),
new Path(taskAttemptId.toString()),
- (RawComparator)ConfigUtils.getOutputKeyComparator(conf),
+ (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
reporter, spilledRecordsCounter, null, null);
if (null == combineProcessor) {
@@ -553,18 +553,18 @@ public class MergeManager {
approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
Writer writer =
new Writer(conf, rfs, outputPath,
- (Class)ConfigUtils.getMapOutputKeyClass(conf),
- (Class)ConfigUtils.getMapOutputValueClass(conf),
+ (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+ (Class)ConfigUtils.getIntermediateInputValueClass(conf),
codec, null);
TezRawKeyValueIterator iter = null;
Path tmpDir = new Path(taskAttemptId.toString());
try {
iter = TezMerger.merge(conf, rfs,
- (Class)ConfigUtils.getMapOutputKeyClass(conf),
- (Class)ConfigUtils.getMapOutputValueClass(conf),
+ (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+ (Class)ConfigUtils.getIntermediateInputValueClass(conf),
codec, inputs.toArray(new Path[inputs.size()]),
true, ioSortFactor, tmpDir,
- (RawComparator)ConfigUtils.getOutputKeyComparator(conf),
+ (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
reporter, spilledRecordsCounter, null,
mergedMapOutputsCounter, null);
@@ -670,11 +670,11 @@ public class MergeManager {
// merge config params
- Class keyClass = (Class)ConfigUtils.getMapOutputKeyClass(job);
- Class valueClass = (Class)ConfigUtils.getMapOutputValueClass(job);
+ Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
+ Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
final Path tmpDir = new Path(taskAttemptId.toString());
final RawComparator comparator =
- (RawComparator)ConfigUtils.getOutputKeyComparator(job);
+ (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
// segments required to vacate memory
List<Segment> memDiskSegments = new ArrayList<Segment>();
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java Wed Apr 24 23:33:22 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.Reflection
import org.apache.tez.common.Constants;
import org.apache.tez.common.RunningTaskContext;
import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.engine.api.Input;
@@ -113,14 +114,15 @@ public abstract class ExternalSorter {
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
// sorter
- sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
- QuickSort.class, IndexedSorter.class), job);
+ sorter = ReflectionUtils.newInstance(job.getClass(
+ TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS, QuickSort.class,
+ IndexedSorter.class), job);
- comparator = ConfigUtils.getOutputKeyComparator(job);
+ comparator = ConfigUtils.getIntermediateOutputKeyComparator(job);
// k/v serialization
- keyClass = ConfigUtils.getMapOutputKeyClass(job);
- valClass = ConfigUtils.getMapOutputValueClass(job);
+ keyClass = ConfigUtils.getIntermediateOutputKeyClass(job);
+ valClass = ConfigUtils.getIntermediateOutputValueClass(job);
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
valSerializer = serializationFactory.getSerializer(valClass);
@@ -136,9 +138,9 @@ public abstract class ExternalSorter {
spilledRecordsCounter =
runningTaskContext.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
// compression
- if (ConfigUtils.getCompressMapOutput(job)) {
+ if (ConfigUtils.shouldCompressIntermediateOutput(job)) {
Class<? extends CompressionCodec> codecClass =
- ConfigUtils.getMapOutputCompressorClass(job, DefaultCodec.class);
+ ConfigUtils.getIntermediateOutputCompressorClass(job, DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
} else {
codec = null;
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java Wed Apr 24 23:33:22 2013
@@ -392,7 +392,7 @@ public class PipelinedSorter extends Ext
keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(mapId.toString()),
- (RawComparator)ConfigUtils.getOutputKeyComparator(job),
+ (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job),
runningTaskContext.getTaskReporter(), sortSegments,
null, spilledRecordsCounter, sortPhase.phase());
Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java Wed Apr 24 23:33:22 2013
@@ -1093,7 +1093,7 @@ public class DefaultSorter extends Exter
keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(mapId.toString()),
- (RawComparator)ConfigUtils.getOutputKeyComparator(job),
+ (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job),
runningTaskContext.getTaskReporter(), sortSegments,
null, spilledRecordsCounter,
sortPhase.phase());
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Wed Apr 24 23:33:22 2013
@@ -128,6 +128,18 @@ public class DeprecatedKeys {
_(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
_(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
+
+ _("map.sort.class", TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS);
+
+ _(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
+
+ // TODO Parameters which cannot be handled via deprecation. Have to be habdled via another translation layer.
+ //_(MRJobConfig.KEY_COMPARATOR, TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS)
+ //_(MRJobConfig.MAP_OUTPUT_KEY_CLASS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS)
+ //_(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS)
+ //_(MRJobConfig.MAP_OUTPUT_COMPRESS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED
+ //_(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC
+
}
private static void _(String oldKey, String newKey) {
Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java Wed Apr 24 23:33:22 2013
@@ -133,8 +133,8 @@ implements Processor {
this.statusUpdate();
- Class keyClass = ConfigUtils.getMapOutputKeyClass(jobConf);
- Class valueClass = ConfigUtils.getMapOutputValueClass(jobConf);
+ Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
+ Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
RawComparator comparator =
ConfigUtils.getOutputValueGroupingComparator(jobConf);