You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/16 05:25:07 UTC

git commit: TEZ-1423. Ability to pass custom properties to keySerializer for OnFileUnorderedPartitionedKVOutput (Siddharth Seth via bikas)

Repository: tez
Updated Branches:
  refs/heads/master 469bf9052 -> 98875db09


TEZ-1423. Ability to pass custom properties to keySerializer for OnFileUnorderedPartitionedKVOutput (Siddharth Seth via bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/98875db0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/98875db0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/98875db0

Branch: refs/heads/master
Commit: 98875db09ece09acee71190489d40ce17ba239c3
Parents: 469bf90
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Aug 15 20:25:12 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Aug 15 20:25:12 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../HadoopKeyValuesBasedBaseEdgeConfigurer.java | 10 +++++--
 .../conf/OrderedGroupedKVInputConfigurer.java   | 29 ++++++++++++++++--
 .../OrderedPartitionedKVEdgeConfigurer.java     | 26 ++++++++++------
 .../OrderedPartitionedKVOutputConfigurer.java   | 29 ++++++++++++++++--
 .../library/conf/UnorderedKVEdgeConfigurer.java | 27 +++++++++++------
 .../conf/UnorderedKVInputConfigurer.java        | 30 +++++++++++++++++--
 .../conf/UnorderedKVOutputConfigurer.java       | 30 +++++++++++++++++--
 .../UnorderedPartitionedKVEdgeConfigurer.java   | 25 ++++++++++------
 .../UnorderedPartitionedKVOutputConfigurer.java | 31 +++++++++++++++++---
 .../conf/TestOnFileSortedOutputConfigurer.java  |  2 +-
 .../TestOnFileUnorderedKVOutputConfigurer.java  |  2 +-
 ...eUnorderedPartitionedKVOutputConfigurer.java |  6 ++--
 .../TestOrderedPartitionedKVEdgeConfigurer.java |  8 ++---
 .../conf/TestShuffledMergedInputConfigurer.java |  2 +-
 .../TestShuffledUnorderedKVInputConfigurer.java |  2 +-
 ...estUnorderedPartitionedKVEdgeConfigurer.java |  4 +--
 ...tUnorderedUnpartitionedKVEdgeConfigurer.java |  2 +-
 18 files changed, 207 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ef940b..c96e3fe 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -67,6 +67,8 @@ INCOMPATIBLE CHANGES
   TEZ-1438. Annotate add java doc for tez-runtime-library and tez-mapreduce
   TEZ-1055. Rename tez-mapreduce-examples to tez-examples
   TEZ-1132. Consistent naming of Input and Outputs
+  TEZ-1423. Ability to pass custom properties to keySerializer for
+  OnFileUnorderedPartitionedKVOutput
 
 Release 0.4.0-incubating: 2014-04-05
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
index 81c1185..15be335 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
@@ -20,6 +20,8 @@ package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.tez.dag.api.UserPayload;
 
@@ -58,11 +60,13 @@ abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
      * @param enabled          whether to enable compression or not
      * @param compressionCodec the codec to be used if compression is enabled. null implies using
      *                         the default
+     * @param codecConf        the codec configuration. This can be null, and is a {@link
+     *                         java.util.Map} of key-value pairs. The keys should be limited to
+     *                         the ones required by the comparator.
      * @return instance of the current builder
      */
-    public T setCompression(boolean enabled, @Nullable String compressionCodec) {
-      return (T) this;
-    }
+    public abstract T setCompression(boolean enabled, @Nullable String compressionCodec,
+                            @Nullable Map<String, String> codecConf);
 
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
index 016cbfd..cc6e44b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedGroupedKVInputConfigurer.java
@@ -435,12 +435,18 @@ public class OrderedGroupedKVInputConfigurer {
       return this;
     }
 
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
       this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
       if (enabled && compressionCodec != null) {
         this.conf
             .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
       }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
@@ -451,10 +457,13 @@ public class OrderedGroupedKVInputConfigurer {
      *
      * @param serializationClassName
      * @param comparatorClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
     public Builder setKeySerializationClass(String serializationClassName,
-        String comparatorClassName) {
+        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       Preconditions.checkArgument(comparatorClassName != null,
@@ -462,6 +471,11 @@ public class OrderedGroupedKVInputConfigurer {
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
       setKeyComparatorClass(comparatorClassName, null);
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
@@ -469,13 +483,22 @@ public class OrderedGroupedKVInputConfigurer {
      * Serialization class to be used for serializing values.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setValueSerializationClass(String serializationClassName) {
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
index a9c18b1..3abaf47 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
@@ -206,12 +206,15 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
      *
      * @param serializationClassName
      * @param comparatorClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
     public Builder setKeySerializationClass(String serializationClassName,
-        String comparatorClassName) {
-      outputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName);
-      inputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName);
+        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName, serializerConf);
+      inputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName, serializerConf);
       return this;
     }
 
@@ -219,19 +222,24 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
      * Set serialization class responsible for providing serializer/deserializer for values.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      outputBuilder.setValueSerializationClass(serializationClassName);
-      inputBuilder.setValueSerializationClass(serializationClassName);
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+      inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
       return this;
     }
 
 
     @Override
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      outputBuilder.setCompression(enabled, compressionCodec);
-      inputBuilder.setCompression(enabled, compressionCodec);
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
+      outputBuilder.setCompression(enabled, compressionCodec, codecConf);
+      inputBuilder.setCompression(enabled, compressionCodec, codecConf);
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
index 72063e0..dd73c29 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVOutputConfigurer.java
@@ -361,12 +361,18 @@ public class OrderedPartitionedKVOutputConfigurer {
       return this;
     }
 
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
       this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
       if (enabled && compressionCodec != null) {
         this.conf
             .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
       }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
@@ -377,10 +383,13 @@ public class OrderedPartitionedKVOutputConfigurer {
      *
      * @param serializationClassName
      * @param comparatorClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
     public Builder setKeySerializationClass(String serializationClassName,
-        String comparatorClassName) {
+        String comparatorClassName, @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       Preconditions.checkArgument(comparatorClassName != null,
@@ -388,6 +397,11 @@ public class OrderedPartitionedKVOutputConfigurer {
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
       setKeyComparatorClass(comparatorClassName, null);
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
@@ -395,13 +409,22 @@ public class OrderedPartitionedKVOutputConfigurer {
      * Set serialization class responsible for providing serializer/deserializer for values.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setValueSerializationClass(String serializationClassName) {
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
index 1e0e56c..e96e0ef 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVEdgeConfigurer.java
@@ -171,9 +171,10 @@ public class UnorderedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfi
     }
 
     @Override
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      outputBuilder.setCompression(enabled, compressionCodec);
-      inputBuilder.setCompression(enabled, compressionCodec);
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
+      outputBuilder.setCompression(enabled, compressionCodec, codecConf);
+      inputBuilder.setCompression(enabled, compressionCodec, codecConf);
       return this;
     }
 
@@ -203,11 +204,15 @@ public class UnorderedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfi
      * the corresponding comparator class to be used as key comparator.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setKeySerializationClass(String serializationClassName) {
-      outputBuilder.setKeySerializationClass(serializationClassName);
-      inputBuilder.setKeySerializationClass(serializationClassName);
+    public Builder setKeySerializationClass(String serializationClassName,
+                                            @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
+      inputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
       return this;
     }
 
@@ -215,11 +220,15 @@ public class UnorderedKVEdgeConfigurer extends HadoopKeyValuesBasedBaseEdgeConfi
      * Set serialization class responsible for providing serializer/deserializer for values.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      outputBuilder.setValueSerializationClass(serializationClassName);
-      inputBuilder.setValueSerializationClass(serializationClassName);
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+      inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
index 2203928..8771c97 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVInputConfigurer.java
@@ -284,12 +284,18 @@ public class UnorderedKVInputConfigurer {
       return this;
     }
 
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
       this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
       if (enabled && compressionCodec != null) {
         this.conf
             .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
       }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
@@ -298,13 +304,22 @@ public class UnorderedKVInputConfigurer {
      * the corresponding comparator class to be used as key comparator.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setKeySerializationClass(String serializationClassName) {
+    public Builder setKeySerializationClass(String serializationClassName,
+                                            @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
@@ -312,13 +327,22 @@ public class UnorderedKVInputConfigurer {
      * Set serialization class responsible for providing serializer/deserializer for values.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setValueSerializationClass(String serializationClassName) {
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
index 986ee45..2ef4350 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedKVOutputConfigurer.java
@@ -214,13 +214,22 @@ public class UnorderedKVOutputConfigurer {
      * Set serialization class responsible for providing serializer/deserializer for keys.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setKeySerializationClass(String serializationClassName) {
+    public Builder setKeySerializationClass(String serializationClassName,
+                                            @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
@@ -228,22 +237,37 @@ public class UnorderedKVOutputConfigurer {
      * Set serialization class responsible for providing serializer/deserializer for values.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setValueSerializationClass(String serializationClassName) {
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
       this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
       if (enabled && compressionCodec != null) {
         this.conf
             .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
       }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
index 30e141c..e56a927 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java
@@ -177,9 +177,9 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
     }
 
     @Override
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
-      outputBuilder.setCompression(enabled, compressionCodec);
-      inputBuilder.setCompression(enabled, compressionCodec);
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec, @Nullable Map<String, String> codecConf) {
+      outputBuilder.setCompression(enabled, compressionCodec, codecConf);
+      inputBuilder.setCompression(enabled, compressionCodec, codecConf);
       return this;
     }
 
@@ -208,11 +208,15 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
      * Set serialization class responsible for providing serializer/deserializer for keys.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setKeySerializationClass(String serializationClassName) {
-      outputBuilder.setKeySerializationClass(serializationClassName);
-      inputBuilder.setKeySerializationClass(serializationClassName);
+    public Builder setKeySerializationClass(String serializationClassName,
+                                            @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
+      inputBuilder.setKeySerializationClass(serializationClassName, serializerConf);
       return this;
     }
 
@@ -220,11 +224,14 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa
      * Set serialization class responsible for providing serializer/deserializer for values.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setValueSerializationClass(String serializationClassName) {
-      outputBuilder.setValueSerializationClass(serializationClassName);
-      inputBuilder.setValueSerializationClass(serializationClassName);
+    public Builder setValueSerializationClass(String serializationClassName, @Nullable Map<String, String> serializerConf) {
+      outputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
+      inputBuilder.setValueSerializationClass(serializationClassName, serializerConf);
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
index 8033c95..9417fbd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVOutputConfigurer.java
@@ -255,12 +255,18 @@ public class UnorderedPartitionedKVOutputConfigurer {
       return this;
     }
 
-    public Builder setCompression(boolean enabled, @Nullable String compressionCodec) {
+    public Builder setCompression(boolean enabled, @Nullable String compressionCodec,
+                                  @Nullable Map<String, String> codecConf) {
       this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, enabled);
       if (enabled && compressionCodec != null) {
         this.conf
             .set(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, compressionCodec);
       }
+      if (codecConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, codecConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 
@@ -268,27 +274,44 @@ public class UnorderedPartitionedKVOutputConfigurer {
      * Set serialization class responsible for providing serializer/deserializer for keys.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setKeySerializationClass(String serializationClassName) {
+    public Builder setKeySerializationClass(String serializationClassName,
+                                            @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
-      return this;
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }      return this;
     }
 
     /**
      * Set serialization class responsible for providing serializer/deserializer for values.
      *
      * @param serializationClassName
+     * @param serializerConf         the serializer configuration. This can be null, and is a
+     *                               {@link java.util.Map} of key-value pairs. The keys should be limited
+     *                               to the ones required by the comparator.
      * @return
      */
-    public Builder setValueSerializationClass(String serializationClassName) {
+    public Builder setValueSerializationClass(String serializationClassName,
+                                              @Nullable Map<String, String> serializerConf) {
       Preconditions.checkArgument(serializationClassName != null,
           "serializationClassName cannot be null");
       this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + ","
           + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY));
+      if (serializerConf != null) {
+        // Merging the confs for now. Change to be specific in the future.
+        ConfigUtils.mergeConfsWithExclusions(this.conf, serializerConf,
+            TezRuntimeConfiguration.getRuntimeConfigKeySet());
+      }
       return this;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java
index 970f500..f0a6bf8 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileSortedOutputConfigurer.java
@@ -75,7 +75,7 @@ public class TestOnFileSortedOutputConfigurer {
     OrderedPartitionedKVOutputConfigurer.Builder builder =
         OrderedPartitionedKVOutputConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER", null)
             .setKeyComparatorClass("KEY_COMPARATOR", null)
-            .setCompression(true, "CustomCodec")
+            .setCompression(true, "CustomCodec", null)
             .setSortBufferSize(2048)
             .setAdditionalConfiguration("test.key.1", "key1")
             .setAdditionalConfiguration("file.shouldExist", "file")

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java
index 2422203..bd182ec 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedKVOutputConfigurer.java
@@ -65,7 +65,7 @@ public class TestOnFileUnorderedKVOutputConfigurer {
     additionalConf.put("file.shouldExist", "file");
     UnorderedKVOutputConfigurer.Builder builder =
         UnorderedKVOutputConfigurer.newBuilder("KEY", "VALUE")
-            .setCompression(true, "CustomCodec")
+            .setCompression(true, "CustomCodec", null)
             .setAdditionalConfiguration("fs.shouldExist", "fs")
             .setAdditionalConfiguration("test.key.1", "key1")
             .setAdditionalConfiguration(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD,

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java
index 73cab11..0ce96eb 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutputConfigurer.java
@@ -76,7 +76,7 @@ public class TestOnFileUnorderedPartitionedKVOutputConfigurer {
     UnorderedPartitionedKVOutputConfigurer.Builder builder =
         UnorderedPartitionedKVOutputConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER",
             null)
-            .setCompression(true, "CustomCodec")
+            .setCompression(true, "CustomCodec", null)
             .setAvailableBufferSize(1111)
             .setAdditionalConfiguration("fs.shouldExist", "fs")
             .setAdditionalConfiguration("test.key.1", "key1")
@@ -123,8 +123,8 @@ public class TestOnFileUnorderedPartitionedKVOutputConfigurer {
     UnorderedPartitionedKVOutputConfigurer.Builder builder =
         UnorderedPartitionedKVOutputConfigurer
             .newBuilder("KEY", "VALUE", "PARTITIONER", null)
-            .setKeySerializationClass("SerClass1")
-            .setValueSerializationClass("SerClass2");
+            .setKeySerializationClass("SerClass1", null)
+            .setValueSerializationClass("SerClass2", null);
     UnorderedPartitionedKVOutputConfigurer configuration = builder.build();
 
     UnorderedPartitionedKVOutputConfigurer rebuilt =

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
index 6f91cbc..924b988 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java
@@ -186,7 +186,7 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
         .configureOutput().setSortBufferSize(1111).setSorterNumThreads(2).done()
         .configureInput().setMaxSingleMemorySegmentFraction(0.11f).setMergeFraction(0.22f)
         .setPostMergeBufferFraction(0.33f).setShuffleBufferFraction(0.44f).done()
-        .setCompression(true, "CustomCodec");
+        .setCompression(true, "CustomCodec", null);
 
     OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
 
@@ -244,9 +244,9 @@ public class TestOrderedPartitionedKVEdgeConfigurer {
   public void testSerialization() {
     OrderedPartitionedKVEdgeConfigurer.Builder builder = OrderedPartitionedKVEdgeConfigurer
         .newBuilder("KEY", "VALUE", "PARTITIONER")
-        .setCompression(true, "CustomCodec")
-        .setKeySerializationClass("serClass1", "SomeComparator1")
-        .setValueSerializationClass("serClass2");
+        .setCompression(true, "CustomCodec", null)
+        .setKeySerializationClass("serClass1", "SomeComparator1", null)
+        .setValueSerializationClass("serClass2", null);
 
     OrderedPartitionedKVEdgeConfigurer configuration = builder.build();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java
index 4560556..a805601 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledMergedInputConfigurer.java
@@ -65,7 +65,7 @@ public class TestShuffledMergedInputConfigurer {
     OrderedGroupedKVInputConfigurer.Builder builder =
         OrderedGroupedKVInputConfigurer.newBuilder("KEY", "VALUE")
             .setKeyComparatorClass("KEY_COMPARATOR", null)
-            .setCompression(true, "CustomCodec")
+            .setCompression(true, "CustomCodec", null)
             .setMaxSingleMemorySegmentFraction(0.11f)
             .setMergeFraction(0.22f)
             .setPostMergeBufferFraction(0.33f)

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java
index 3f980a4..d4324d3 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestShuffledUnorderedKVInputConfigurer.java
@@ -63,7 +63,7 @@ public class TestShuffledUnorderedKVInputConfigurer {
     additionalConf.put("file.shouldExist", "file");
     UnorderedKVInputConfigurer.Builder builder =
         UnorderedKVInputConfigurer.newBuilder("KEY", "VALUE")
-            .setCompression(true, "CustomCodec")
+            .setCompression(true, "CustomCodec", null)
             .setMaxSingleMemorySegmentFraction(0.11f)
             .setMergeFraction(0.22f)
             .setShuffleBufferFraction(0.33f)

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
index 03b218e..796a5b0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java
@@ -63,8 +63,8 @@ public class TestUnorderedPartitionedKVEdgeConfigurer {
   public void testDefaultConfigsUsed() {
     UnorderedPartitionedKVEdgeConfigurer.Builder builder =
         UnorderedPartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER");
-    builder.setKeySerializationClass("SerClass1");
-    builder.setValueSerializationClass("SerClass2");
+    builder.setKeySerializationClass("SerClass1", null);
+    builder.setValueSerializationClass("SerClass2", null);
 
     UnorderedPartitionedKVEdgeConfigurer configuration = builder.build();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/98875db0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
index caa2364..2215557 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java
@@ -56,7 +56,7 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer {
   public void testDefaultConfigsUsed() {
     UnorderedKVEdgeConfigurer.Builder builder =
         UnorderedKVEdgeConfigurer.newBuilder("KEY", "VALUE");
-    builder.setKeySerializationClass("SerClass1").setValueSerializationClass("SerClass2");
+    builder.setKeySerializationClass("SerClass1", null).setValueSerializationClass("SerClass2", null);
 
     UnorderedKVEdgeConfigurer configuration = builder.build();