You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/25 01:33:24 UTC

svn commit: r1471778 - in /incubator/tez/branches/TEZ-1: tez-common/src/main/java/org/apache/tez/common/ tez-engine/src/main/java/org/apache/tez/engine/common/ tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/ tez-engine/src/main/java...

Author: sseth
Date: Wed Apr 24 23:33:22 2013
New Revision: 1471778

URL: http://svn.apache.org/r1471778
Log:
TEZ-79. Remove mapreduce config references from TezEngine. Added explicit configs for parameters which were earlier implicit across stages. (sseth)

Modified:
    incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
    incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java

Modified: incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java (original)
+++ incubator/tez/branches/TEZ-1/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java Wed Apr 24 23:33:22 2013
@@ -250,6 +250,39 @@ public class TezJobConfig {
   public static final String TEZ_ENGINE_INPUT_BUFFER_PERCENT = 
       "tez.engine.task.input.buffer.percent";
   public static final float DEFAULT_TEZ_ENGINE_INPUT_BUFFER_PERCENT = 0.0f;
+
+  // TODO Rename. 
+  public static final String TEZ_ENGINE_GROUP_COMPARATOR_CLASS = 
+      "tez.engine.group.comparator.class";
+  
+  // TODO Better name.
+  public static final String TEZ_ENGINE_INTERNAL_SORTER_CLASS = 
+      "tez.engine.internal.sorter.class";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS = 
+      "tez.engine.intermediate-output.key.comparator.class";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS = 
+      "tez.engine.intermediate-input.key.comparator.class";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS = 
+      "tez.engine.intermediate-output.key.class";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS = 
+      "tez.engine.intermediate-input.key.class";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS = 
+      "tez.engine.intermediate-output.value.class";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS = 
+      "tez.engine.intermediate-input.value.class";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS = 
+      "tez.engine.intermediate-output.should-compress";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED = 
+      "tez.engine.intermdiate-input.is-compressed";
+  
+  public static final String TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC = 
+      "tez.engine.intermediate-output.compress.codec";
+  public static final String TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC = 
+      "tez.engine.intermediate-input.compress.codec";
   
   // TODO This should be in DAGConfiguration
   /* config for tracking the local file where all the credentials for the job

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java Wed Apr 24 23:33:22 2013
@@ -27,24 +27,73 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class ConfigUtils {
-  public static  Class<? extends CompressionCodec> getMapOutputCompressorClass(
-      Configuration conf, Class<DefaultCodec> class1) {
-    // TODO Auto-generated method stub
-    return null;
+
+  public static Class<? extends CompressionCodec> getIntermediateOutputCompressorClass(
+      Configuration conf, Class<DefaultCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    String name = conf
+        .get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC);
+    if (name != null) {
+      try {
+        codecClass = conf.getClassByName(name).asSubclass(
+            CompressionCodec.class);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name
+            + " was not found.", e);
+      }
+    }
+    return codecClass;
+  }
+  
+  public static Class<? extends CompressionCodec> getIntermediateInputCompressorClass(
+      Configuration conf, Class<DefaultCodec> defaultValue) {
+    Class<? extends CompressionCodec> codecClass = defaultValue;
+    String name = conf
+        .get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC);
+    if (name != null) {
+      try {
+        codecClass = conf.getClassByName(name).asSubclass(
+            CompressionCodec.class);
+      } catch (ClassNotFoundException e) {
+        throw new IllegalArgumentException("Compression codec " + name
+            + " was not found.", e);
+      }
+    }
+    return codecClass;
   }
 
-  public static  boolean getCompressMapOutput(Configuration conf) {
-    // TODO Auto-generated method stub
-    return false;
+
+  // TODO Move defaults over to a constants file.
+  
+  public static boolean shouldCompressIntermediateOutput(Configuration conf) {
+    return conf.getBoolean(
+        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
   }
 
-  public static <V> Class<V> getMapOutputValueClass(Configuration conf) {
-    Class<V> retv = 
-        (Class<V>) 
-        conf.getClass("mapreduce.map.output.value.class", null, Object.class);
+  public static boolean isIntermediateInputCompressed(Configuration conf) {
+    return conf.getBoolean(
+        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, false);
+  }
+
+  // TODO Is it possible to simplify the 3-level lookup (Comparator, Map-key, Job-key)
+  public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
+    Class<V> retv = (Class<V>) conf.getClass(
+        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, null,
+        Object.class);
+    if (retv == null) {
+      retv = getOutputValueClass(conf);
+    }
+    return retv;
+  }
+  
+  public static <V> Class<V> getIntermediateInputValueClass(Configuration conf) {
+    Class<V> retv = (Class<V>) conf.getClass(
+        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS, null,
+        Object.class);
     if (retv == null) {
       retv = getOutputValueClass(conf);
     }
@@ -56,16 +105,26 @@ public class ConfigUtils {
         "mapreduce.job.output.value.class", Text.class, Object.class);
   }
 
-  public static <K> Class<K> getMapOutputKeyClass(Configuration conf) {
-    Class<K> retv = 
-        (Class<K>) conf.getClass("mapreduce.map.output.key.class", null, Object.class);
+  public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) {
+    Class<K> retv = (Class<K>) conf.getClass(
+        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, null,
+        Object.class);
     if (retv == null) {
       retv = getOutputKeyClass(conf);
     }
-    return 
-        retv;
+    return retv;
   }
 
+  public static <K> Class<K> getIntermediateInputKeyClass(Configuration conf) {
+    Class<K> retv = (Class<K>) conf.getClass(
+        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS, null,
+        Object.class);
+    if (retv == null) {
+      retv = getOutputKeyClass(conf);
+    }
+    return retv;
+  }
+  
   public static <K> Class<K> getOutputKeyClass(Configuration conf) {
     return 
         (Class<K>) 
@@ -74,17 +133,28 @@ public class ConfigUtils {
             LongWritable.class, Object.class);
 }
   
-  public static <K> RawComparator<K> getOutputKeyComparator(Configuration conf) {
-    Class<? extends RawComparator> theClass = 
-        conf.getClass(
-            "mapreduce.job.output.key.comparator.class", null, 
-            RawComparator.class);
-      if (theClass != null)
-        return ReflectionUtils.newInstance(theClass, conf);
-      return WritableComparator.get(
-          getMapOutputKeyClass(conf).asSubclass(WritableComparable.class));
-    }
+  public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
+    Class<? extends RawComparator> theClass = conf.getClass(
+        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, null,
+        RawComparator.class);
+    if (theClass != null)
+      return ReflectionUtils.newInstance(theClass, conf);
+    return WritableComparator.get(getIntermediateOutputKeyClass(conf).asSubclass(
+        WritableComparable.class));
+  }
+
+  public static <K> RawComparator<K> getIntermediateInputKeyComparator(Configuration conf) {
+    Class<? extends RawComparator> theClass = conf.getClass(
+        TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, null,
+        RawComparator.class);
+    if (theClass != null)
+      return ReflectionUtils.newInstance(theClass, conf);
+    return WritableComparator.get(getIntermediateInputKeyClass(conf).asSubclass(
+        WritableComparable.class));
+  }
 
+  
+  
   public static <V> RawComparator<V> getOutputValueGroupingComparator(
       Configuration conf) {
     Class<? extends RawComparator> theClass = 
@@ -92,7 +162,7 @@ public class ConfigUtils {
             "mapreduce.job.output.group.comparator.class", 
             null, RawComparator.class);
     if (theClass == null) {
-      return getOutputKeyComparator(conf);
+      return getIntermediateOutputKeyComparator(conf);
     }
 
     return ReflectionUtils.newInstance(theClass, conf);

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java Wed Apr 24 23:33:22 2013
@@ -68,9 +68,9 @@ public class LocalShuffle {
     this.taskContext = taskContext;
     this.runningTaskContext = runningTaskContext;
     this.conf = conf;
-    this.keyClass = ConfigUtils.getMapOutputKeyClass(conf);
-    this.valClass = ConfigUtils.getMapOutputValueClass(conf);
-    this.comparator = ConfigUtils.getOutputKeyComparator(conf);
+    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
+    this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
 
     this.sortFactor =
         conf.getInt(
@@ -83,9 +83,9 @@ public class LocalShuffle {
         reporter.getCounter(TaskCounter.SPILLED_RECORDS);
     
     // compression
-    if (ConfigUtils.getCompressMapOutput(conf)) {
+    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getMapOutputCompressorClass(conf, DefaultCodec.class);
+          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
       this.codec = ReflectionUtils.newInstance(codecClass, conf);
     } else {
       this.codec = null;

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java Wed Apr 24 23:33:22 2013
@@ -124,9 +124,9 @@ class Fetcher extends Thread {
     wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_REDUCE.toString());
 
-    if (ConfigUtils.getCompressMapOutput(job)) {
+    if (ConfigUtils.isIntermediateInputCompressed(job)) {
       Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getMapOutputCompressorClass(job, DefaultCodec.class);
+          ConfigUtils.getIntermediateInputCompressorClass(job, DefaultCodec.class);
       codec = ReflectionUtils.newInstance(codecClass, job);
       decompressor = CodecPool.getDecompressor(codec);
     } else {

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java Wed Apr 24 23:33:22 2013
@@ -142,9 +142,9 @@ public class MergeManager {
     this.localFS = localFS;
     this.rfs = ((LocalFileSystem)localFS).getRaw();
 
-    if (ConfigUtils.getCompressMapOutput(conf)) {
+    if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getMapOutputCompressorClass(conf, DefaultCodec.class);
+          ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
       codec = ReflectionUtils.newInstance(codecClass, conf);
     } else {
       codec = null;
@@ -416,11 +416,11 @@ public class MergeManager {
 
       TezRawKeyValueIterator rIter = 
         TezMerger.merge(conf, rfs,
-                       ConfigUtils.getMapOutputKeyClass(conf),
-                       ConfigUtils.getMapOutputValueClass(conf),
+                       ConfigUtils.getIntermediateInputKeyClass(conf),
+                       ConfigUtils.getIntermediateInputValueClass(conf),
                        inMemorySegments, inMemorySegments.size(),
                        new Path(taskAttemptId.toString()),
-                       (RawComparator)ConfigUtils.getOutputKeyComparator(conf),
+                       (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
                        reporter, null, null, null);
       TezMerger.writeFile(rIter, writer, reporter, conf);
       writer.close();
@@ -473,8 +473,8 @@ public class MergeManager {
 
       Writer writer = 
         new Writer(conf, rfs, outputPath,
-                        (Class)ConfigUtils.getMapOutputKeyClass(conf),
-                        (Class)ConfigUtils.getMapOutputValueClass(conf),
+                        (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+                        (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                         codec, null);
 
       TezRawKeyValueIterator rIter = null;
@@ -483,11 +483,11 @@ public class MergeManager {
                  " segments...");
         
         rIter = TezMerger.merge(conf, rfs,
-                             (Class)ConfigUtils.getMapOutputKeyClass(conf),
-                             (Class)ConfigUtils.getMapOutputValueClass(conf),
+                             (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
+                             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                              inMemorySegments, inMemorySegments.size(),
                              new Path(taskAttemptId.toString()),
-                             (RawComparator)ConfigUtils.getOutputKeyComparator(conf),
+                             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
                              reporter, spilledRecordsCounter, null, null);
         
         if (null == combineProcessor) {
@@ -553,18 +553,18 @@ public class MergeManager {
             approxOutputSize, conf).suffix(Constants.MERGED_OUTPUT_PREFIX);
       Writer writer = 
         new Writer(conf, rfs, outputPath, 
-                        (Class)ConfigUtils.getMapOutputKeyClass(conf), 
-                        (Class)ConfigUtils.getMapOutputValueClass(conf),
+                        (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
+                        (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                         codec, null);
       TezRawKeyValueIterator iter  = null;
       Path tmpDir = new Path(taskAttemptId.toString());
       try {
         iter = TezMerger.merge(conf, rfs,
-                            (Class)ConfigUtils.getMapOutputKeyClass(conf), 
-                            (Class)ConfigUtils.getMapOutputValueClass(conf),
+                            (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
+                            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                             codec, inputs.toArray(new Path[inputs.size()]), 
                             true, ioSortFactor, tmpDir, 
-                            (RawComparator)ConfigUtils.getOutputKeyComparator(conf), 
+                            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), 
                             reporter, spilledRecordsCounter, null, 
                             mergedMapOutputsCounter, null);
 
@@ -670,11 +670,11 @@ public class MergeManager {
     
 
     // merge config params
-    Class keyClass = (Class)ConfigUtils.getMapOutputKeyClass(job);
-    Class valueClass = (Class)ConfigUtils.getMapOutputValueClass(job);
+    Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
+    Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
     final Path tmpDir = new Path(taskAttemptId.toString());
     final RawComparator comparator =
-      (RawComparator)ConfigUtils.getOutputKeyComparator(job);
+      (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
 
     // segments required to vacate memory
     List<Segment> memDiskSegments = new ArrayList<Segment>();

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java Wed Apr 24 23:33:22 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.RunningTaskContext;
 import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.api.Input;
@@ -113,14 +114,15 @@ public abstract class ExternalSorter {
     rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
     
     // sorter
-    sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
-          QuickSort.class, IndexedSorter.class), job);
+    sorter = ReflectionUtils.newInstance(job.getClass(
+        TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS, QuickSort.class,
+        IndexedSorter.class), job);
     
-    comparator = ConfigUtils.getOutputKeyComparator(job);
+    comparator = ConfigUtils.getIntermediateOutputKeyComparator(job);
     
     // k/v serialization
-    keyClass = ConfigUtils.getMapOutputKeyClass(job);
-    valClass = ConfigUtils.getMapOutputValueClass(job);
+    keyClass = ConfigUtils.getIntermediateOutputKeyClass(job);
+    valClass = ConfigUtils.getIntermediateOutputValueClass(job);
     serializationFactory = new SerializationFactory(job);
     keySerializer = serializationFactory.getSerializer(keyClass);
     valSerializer = serializationFactory.getSerializer(valClass);
@@ -136,9 +138,9 @@ public abstract class ExternalSorter {
     spilledRecordsCounter = 
         runningTaskContext.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
     // compression
-    if (ConfigUtils.getCompressMapOutput(job)) {
+    if (ConfigUtils.shouldCompressIntermediateOutput(job)) {
       Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getMapOutputCompressorClass(job, DefaultCodec.class);
+          ConfigUtils.getIntermediateOutputCompressorClass(job, DefaultCodec.class);
       codec = ReflectionUtils.newInstance(codecClass, job);
     } else {
       codec = null;

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java Wed Apr 24 23:33:22 2013
@@ -392,7 +392,7 @@ public class PipelinedSorter extends Ext
                      keyClass, valClass, codec,
                      segmentList, mergeFactor,
                      new Path(mapId.toString()),
-                     (RawComparator)ConfigUtils.getOutputKeyComparator(job), 
+                     (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job), 
                      runningTaskContext.getTaskReporter(), sortSegments,
                      null, spilledRecordsCounter, sortPhase.phase());
 

Modified: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java Wed Apr 24 23:33:22 2013
@@ -1093,7 +1093,7 @@ public class DefaultSorter extends Exter
                        keyClass, valClass, codec,
                        segmentList, mergeFactor,
                        new Path(mapId.toString()),
-                       (RawComparator)ConfigUtils.getOutputKeyComparator(job), 
+                       (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job), 
                        runningTaskContext.getTaskReporter(), sortSegments,
                        null, spilledRecordsCounter, 
                        sortPhase.phase());

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Wed Apr 24 23:33:22 2013
@@ -128,6 +128,18 @@ public class DeprecatedKeys {
     _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
     
     _(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
+    
+    _("map.sort.class", TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS);
+    
+    _(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
+    
+    // TODO Parameters which cannot be handled via deprecation. Have to be habdled via another translation layer.
+    //_(MRJobConfig.KEY_COMPARATOR, TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS)
+    //_(MRJobConfig.MAP_OUTPUT_KEY_CLASS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS)
+    //_(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS, TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS)
+    //_(MRJobConfig.MAP_OUTPUT_COMPRESS, TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED
+    //_(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC, TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC, TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC
+    
   }
 
   private static void _(String oldKey, String newKey) {

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1471778&r1=1471777&r2=1471778&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java Wed Apr 24 23:33:22 2013
@@ -133,8 +133,8 @@ implements Processor {
 
     this.statusUpdate();
     
-    Class keyClass = ConfigUtils.getMapOutputKeyClass(jobConf);
-    Class valueClass = ConfigUtils.getMapOutputValueClass(jobConf);
+    Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
+    Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
     RawComparator comparator = 
         ConfigUtils.getOutputValueGroupingComparator(jobConf);