You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/08/27 13:00:43 UTC

[tez] branch master updated: TEZ-3645: Reuse SerializationFactory while sorting, merging, and writing IFiles (Jonathan Turner Eagles reviewed by Rajesh Balamohan, Laszlo Bodor)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new d90b104  TEZ-3645: Reuse SerializationFactory while sorting, merging, and writing IFiles (Jonathan Turner Eagles reviewed by Rajesh Balamohan, Laszlo Bodor)
d90b104 is described below

commit d90b1042e97993eb103f161b40e523638e317f9a
Author: Jonathan Turner Eagles <je...@apache.org>
AuthorDate: Thu Aug 27 14:57:59 2020 +0200

    TEZ-3645: Reuse SerializationFactory while sorting, merging, and writing IFiles (Jonathan Turner Eagles reviewed by Rajesh Balamohan, Laszlo Bodor)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../common/serializer/SerializationContext.java    | 87 ++++++++++++++++++++++
 .../shuffle/orderedgrouped/MergeManager.java       | 72 +++++++++---------
 .../library/common/sort/impl/ExternalSorter.java   | 19 ++---
 .../runtime/library/common/sort/impl/IFile.java    | 43 +++++------
 .../library/common/sort/impl/PipelinedSorter.java  | 28 ++++---
 .../library/common/sort/impl/TezMerger.java        | 55 +++++++-------
 .../common/sort/impl/dflt/DefaultSorter.java       | 33 ++++----
 .../writers/BaseUnorderedPartitionedKVWriter.java  |  9 ++-
 .../writers/UnorderedPartitionedKVWriter.java      | 15 ++--
 .../runtime/library/common/TestValuesIterator.java | 57 +++++++-------
 .../common/readers/TestUnorderedKVReader.java      |  5 +-
 .../shuffle/orderedgrouped/TestMergeManager.java   | 16 ++--
 .../library/common/sort/impl/TestIFile.java        | 47 ++++++------
 .../library/common/sort/impl/TestTezMerger.java    | 38 +++++-----
 14 files changed, 318 insertions(+), 206 deletions(-)

diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/SerializationContext.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/SerializationContext.java
new file mode 100644
index 0000000..2398b8f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/SerializationContext.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.serializer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+
+/**
+ * SerializationContext is a wrapper class for serialization related fields.
+ */
+public class SerializationContext {
+
+  private Class<?> keyClass;
+  private Class<?> valueClass;
+  private Serialization<?> keySerialization;
+  private Serialization<?> valSerialization;
+
+  public SerializationContext(Configuration conf) {
+    this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
+    this.valueClass = ConfigUtils.getIntermediateInputValueClass(conf);
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    if (keyClass != null) {
+      this.keySerialization = serializationFactory.getSerialization(keyClass);
+    }
+    if (valueClass != null) {
+      this.valSerialization = serializationFactory.getSerialization(valueClass);
+    }
+  }
+
+  public SerializationContext(Class<?> keyClass, Class<?> valueClass,
+      Serialization<?> keySerialization, Serialization<?> valSerialization) {
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+    this.keySerialization = keySerialization;
+    this.valSerialization = valSerialization;
+  }
+
+  public Class<?> getKeyClass() {
+    return keyClass;
+  }
+
+  public Class<?> getValueClass() {
+    return valueClass;
+  }
+
+  public Serialization<?> getKeySerialization() {
+    return keySerialization;
+  }
+
+  public Serialization<?> getValSerialization() {
+    return valSerialization;
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public Serializer<?> getKeySerializer() {
+    return keySerialization.getSerializer((Class) keyClass);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public Serializer<?> getValueSerializer() {
+    return valSerialization.getSerializer((Class) valueClass);
+  }
+
+  public void applyToConf(Configuration conf) {
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, keyClass.getName());
+    conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, valueClass.getName());
+  }
+}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index dd23b3b..8565e71 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.FileChunk;
 import org.apache.hadoop.io.RawComparator;
@@ -47,6 +46,7 @@ import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
@@ -158,6 +158,8 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
 
   private final boolean cleanup;
 
+  private SerializationContext serializationContext;
+
   /**
    * Construct the MergeManager. Must call start before it becomes usable.
    */
@@ -297,21 +299,21 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
           + ", mergeThreshold: " + this.mergeThreshold);
     }
     
-    boolean allowMemToMemMerge = 
-        conf.getBoolean(
-            TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, 
+    boolean allowMemToMemMerge =
+        conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM,
             TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM_DEFAULT);
-      if (allowMemToMemMerge) {
-        this.memToMemMerger = 
-          new IntermediateMemoryToMemoryMerger(this,
-                                               memToMemMergeOutputsThreshold);
-      } else {
-        this.memToMemMerger = null;
-      }
-      
-      this.inMemoryMerger = new InMemoryMerger(this);
-      
-      this.onDiskMerger = new OnDiskMerger(this);
+    if (allowMemToMemMerge) {
+      this.memToMemMerger =
+          new IntermediateMemoryToMemoryMerger(this, memToMemMergeOutputsThreshold);
+    } else {
+      this.memToMemMerger = null;
+    }
+
+    this.inMemoryMerger = new InMemoryMerger(this);
+
+    this.onDiskMerger = new OnDiskMerger(this);
+
+    this.serializationContext = new SerializationContext(conf);
   }
 
   void setupParentThread(Thread shuffleSchedulerThread) {
@@ -802,8 +804,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       // TODO Is this doing any combination ?
       TezRawKeyValueIterator rIter = 
         TezMerger.merge(conf, rfs,
-                       ConfigUtils.getIntermediateInputKeyClass(conf),
-                       ConfigUtils.getIntermediateInputValueClass(conf),
+                       serializationContext,
                        inMemorySegments, inMemorySegments.size(),
                        new Path(inputContext.getUniqueIdentifier()),
                        (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
@@ -885,10 +886,10 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       long outFileLen = 0;
       try {
         writer =
-            new Writer(conf, rfs, outputPath,
-                (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-                (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                codec, null, null);
+            new Writer(serializationContext.getKeySerialization(),
+                serializationContext.getValSerialization(), rfs, outputPath,
+                serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+                null, null);
 
         TezRawKeyValueIterator rIter = null;
         LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
@@ -897,8 +898,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
         tmpDir = new Path(inputContext.getUniqueIdentifier());
         // Nothing actually materialized to disk - controlled by setting sort-factor to #segments.
         rIter = TezMerger.merge(conf, rfs,
-            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+            serializationContext,
             inMemorySegments, inMemorySegments.size(),
             tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
             progressable, spilledRecordsCounter, null, additionalBytesRead, null);
@@ -1027,16 +1027,14 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf);
       outputPath = outputPath.suffix(Constants.MERGED_OUTPUT_PREFIX + mergeFileSequenceId.getAndIncrement());
 
-      Writer writer =
-        new Writer(conf, rfs, outputPath, 
-                        (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
-                        (Class)ConfigUtils.getIntermediateInputValueClass(conf),
-                        codec, null, null);
+      Writer writer = new Writer(serializationContext.getKeySerialization(),
+          serializationContext.getValSerialization(), rfs, outputPath,
+          serializationContext.getKeyClass(), serializationContext.getValueClass(), codec, null,
+          null);
       tmpDir = new Path(inputContext.getUniqueIdentifier());
       try {
         TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs,
-            (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
-            (Class)ConfigUtils.getIntermediateInputValueClass(conf),
+            serializationContext,
             inputSegments,
             ioSortFactor, tmpDir,
             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
@@ -1165,8 +1163,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
     inputContext.notifyProgress();
 
     // merge config params
-    Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
-    Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
+    SerializationContext serContext = new SerializationContext(job);
     final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
     final RawComparator comparator =
       (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
@@ -1198,11 +1195,12 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
         final Path outputPath = 
           mapOutputFile.getInputFileForWrite(srcTaskId, Integer.MAX_VALUE,
               inMemToDiskBytes).suffix(Constants.MERGED_OUTPUT_PREFIX);
-        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, keyClass, valueClass,
+        final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs, serContext,
             memDiskSegments, numMemDiskSegments, tmpDir, comparator, progressable,
             spilledRecordsCounter, null, additionalBytesRead, null);
-        final Writer writer = new Writer(job, fs, outputPath,
-            keyClass, valueClass, codec, null, null);
+        final Writer writer = new Writer(serContext.getKeySerialization(),
+            serContext.getValSerialization(), fs, outputPath, serContext.getKeyClass(),
+            serContext.getValueClass(), codec, null, null);
         try {
           TezMerger.writeFile(rIter, writer, progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
         } catch (IOException e) {
@@ -1302,7 +1300,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       diskSegments.addAll(0, memDiskSegments);
       memDiskSegments.clear();
       TezRawKeyValueIterator diskMerge = TezMerger.merge(
-          job, fs, keyClass, valueClass, codec, diskSegments,
+          job, fs, serContext, codec, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
           progressable, false, spilledRecordsCounter, null, additionalBytesRead, null);
       diskSegments.clear();
@@ -1316,7 +1314,7 @@ public class MergeManager implements FetchedInputAllocatorOrderedGrouped {
       LOG.info(finalMergeLog.toString());
     }
     // This is doing nothing but creating an iterator over the segments.
-    return TezMerger.merge(job, fs, keyClass, valueClass, codec,
+    return TezMerger.merge(job, fs, serContext, codec,
         finalSegments, finalSegments.size(), tmpDir,
         comparator, progressable, spilledRecordsCounter, null,
         additionalBytesRead, null);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 16d5849..194e899 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progressable;
@@ -60,13 +59,14 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitio
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 
 import org.apache.tez.common.Preconditions;
 
-@SuppressWarnings({"unchecked", "rawtypes"})
+@SuppressWarnings({"rawtypes"})
 public abstract class ExternalSorter {
 
   private static final Logger LOG = LoggerFactory.getLogger(ExternalSorter.class);
@@ -106,10 +106,9 @@ public abstract class ExternalSorter {
   protected final FileSystem rfs;
   protected final TezTaskOutput mapOutputFile;
   protected final int partitions;
-  protected final Class keyClass;
-  protected final Class valClass;
   protected final RawComparator comparator;
-  protected final SerializationFactory serializationFactory;
+
+  protected final SerializationContext serializationContext;
   protected final Serializer keySerializer;
   protected final Serializer valSerializer;
   
@@ -201,14 +200,12 @@ public abstract class ExternalSorter {
     comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf);
 
     // k/v serialization
-    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
-    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
-    serializationFactory = new SerializationFactory(this.conf);
-    keySerializer = serializationFactory.getSerializer(keyClass);
-    valSerializer = serializationFactory.getSerializer(valClass);
+    this.serializationContext = new SerializationContext(this.conf);
+    keySerializer = serializationContext.getKeySerializer();
+    valSerializer = serializationContext.getValueSerializer();
     LOG.info(outputContext.getDestinationVertexName() + " using: "
         + "memoryMb=" + assignedMb
-        + ", keySerializerClass=" + keyClass
+        + ", keySerializerClass=" + serializationContext.getKeyClass()
         + ", valueSerializerClass=" + valSerializer
         + ", comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf)
         + ", partitioner=" + conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 5d8e944..6aa44e2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serialization;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.tez.common.counters.TezCounter;
 
@@ -110,7 +110,8 @@ public class IFile {
      * Note that we do not allow compression in in-mem stream.
      * When spilled over to file, compression gets enabled.
      *
-     * @param conf
+     * @param keySerialization
+     * @param valSerialization
      * @param fs
      * @param taskOutput
      * @param keyClass
@@ -121,13 +122,11 @@ public class IFile {
      * @param cacheSize
      * @throws IOException
      */
-    public FileBackedInMemIFileWriter(Configuration conf, FileSystem fs,
-        TezTaskOutput taskOutput, Class keyClass, Class valueClass,
-        CompressionCodec codec,
-        TezCounter writesCounter,
-        TezCounter serializedBytesCounter,
-        int cacheSize) throws IOException {
-      super(conf, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
+    public FileBackedInMemIFileWriter(Serialization<?> keySerialization,
+        Serialization<?> valSerialization, FileSystem fs, TezTaskOutput taskOutput,
+        Class<?> keyClass, Class<?> valueClass, CompressionCodec codec, TezCounter writesCounter,
+        TezCounter serializedBytesCounter, int cacheSize) throws IOException {
+      super(keySerialization, valSerialization, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
           keyClass, valueClass, null, writesCounter, serializedBytesCounter);
       this.fs = fs;
       this.cacheStream = (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
@@ -315,12 +314,12 @@ public class IFile {
     protected final boolean rle;
 
 
-    public Writer(Configuration conf, FileSystem fs, Path file,
+    public Writer(Serialization keySerialization, Serialization valSerialization, FileSystem fs, Path file,
                   Class keyClass, Class valueClass,
                   CompressionCodec codec,
                   TezCounter writesCounter,
                   TezCounter serializedBytesCounter) throws IOException {
-      this(conf, fs.create(file), keyClass, valueClass, codec,
+      this(keySerialization, valSerialization, fs.create(file), keyClass, valueClass, codec,
            writesCounter, serializedBytesCounter);
       ownOutputStream = true;
     }
@@ -331,17 +330,17 @@ public class IFile {
       this.rle = rle;
     }
 
-    public Writer(Configuration conf, FSDataOutputStream outputStream,
+    public Writer(Serialization keySerialization, Serialization valSerialization, FSDataOutputStream outputStream,
         Class keyClass, Class valueClass, CompressionCodec codec, TezCounter writesCounter,
         TezCounter serializedBytesCounter) throws IOException {
-      this(conf, outputStream, keyClass, valueClass, codec, writesCounter,
+      this(keySerialization, valSerialization, outputStream, keyClass, valueClass, codec, writesCounter,
           serializedBytesCounter, false);
     }
 
-    public Writer(Configuration conf, FSDataOutputStream outputStream,
-        Class keyClass, Class valueClass,
-        CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter,
-        boolean rle) throws IOException {
+    public Writer(Serialization keySerialization, Serialization valSerialization, FSDataOutputStream outputStream,
+                  Class keyClass, Class valueClass,
+                  CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter,
+                  boolean rle) throws IOException {
       this.rawOut = outputStream;
       this.writtenRecordsCounter = writesCounter;
       this.serializedUncompressedBytes = serializedBytesCounter;
@@ -354,11 +353,9 @@ public class IFile {
 
       if (keyClass != null) {
         this.closeSerializers = true;
-        SerializationFactory serializationFactory =
-          new SerializationFactory(conf);
-        this.keySerializer = serializationFactory.getSerializer(keyClass);
+        this.keySerializer = keySerialization.getSerializer(keyClass);
         this.keySerializer.open(buffer);
-        this.valueSerializer = serializationFactory.getSerializer(valueClass);
+        this.valueSerializer = valSerialization.getSerializer(valueClass);
         this.valueSerializer.open(buffer);
       } else {
         this.closeSerializers = false;
@@ -383,8 +380,8 @@ public class IFile {
       }
     }
 
-    public Writer(Configuration conf, FileSystem fs, Path file) throws IOException {
-      this(conf, fs, file, null, null, null, null, null);
+    public Writer(Serialization keySerialization, Serialization valSerialization, FileSystem fs, Path file) throws IOException {
+      this(keySerialization, valSerialization, fs, file, null, null, null, null, null);
     }
 
     protected void writeHeader(OutputStream outputStream) throws IOException {
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 2ace875..b70d6c4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -398,14 +398,14 @@ public class PipelinedSorter extends ExternalSorter {
    */
   synchronized void collect(Object key, Object value, final int partition
                                    ) throws IOException {
-    if (key.getClass() != keyClass) {
+    if (key.getClass() != serializationContext.getKeyClass()) {
       throw new IOException("Type mismatch in key from map: expected "
-                            + keyClass.getName() + ", received "
+                            + serializationContext.getKeyClass().getName() + ", received "
                             + key.getClass().getName());
     }
-    if (value.getClass() != valClass) {
+    if (value.getClass() != serializationContext.getValueClass()) {
       throw new IOException("Type mismatch in value from map: expected "
-                            + valClass.getName() + ", received "
+                            + serializationContext.getValueClass().getName() + ", received "
                             + value.getClass().getName());
     }
     if (partition < 0 || partition >= partitions) {
@@ -506,8 +506,9 @@ public class PipelinedSorter extends ExternalSorter {
         try {
           long segmentStart = out.getPos();
           if (!sendEmptyPartitionDetails || (i == partition)) {
-            writer = new Writer(conf, out, keyClass, valClass, codec,
-                spilledRecordsCounter, null, false);
+            writer = new Writer(serializationContext.getKeySerialization(),
+                serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+                serializationContext.getValueClass(), codec, spilledRecordsCounter, null, false);
           }
           // we need not check for combiner since its a single record
           if (i == partition) {
@@ -592,8 +593,10 @@ public class PipelinedSorter extends ExternalSorter {
         Writer writer = null;
         boolean hasNext = kvIter.hasNext();
         if (hasNext || !sendEmptyPartitionDetails) {
-          writer = new Writer(conf, out, keyClass, valClass, codec,
-              spilledRecordsCounter, null, merger.needsRLE());
+          writer = new Writer(serializationContext.getKeySerialization(),
+              serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+              serializationContext.getValueClass(), codec, spilledRecordsCounter, null,
+              merger.needsRLE());
         }
         if (combiner == null) {
           while (kvIter.next()) {
@@ -791,7 +794,7 @@ public class PipelinedSorter extends ExternalSorter {
         boolean sortSegments = segmentList.size() > mergeFactor;
         //merge
         TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
-            keyClass, valClass, codec,
+            serializationContext, codec,
             segmentList, mergeFactor,
             new Path(uniqueIdentifier),
             (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
@@ -803,9 +806,10 @@ public class PipelinedSorter extends ExternalSorter {
         long rawLength = 0;
         long partLength = 0;
         if (shouldWrite) {
-          Writer writer =
-              new Writer(conf, finalOut, keyClass, valClass, codec,
-                  spilledRecordsCounter, null, merger.needsRLE());
+          Writer writer = new Writer(serializationContext.getKeySerialization(),
+              serializationContext.getValSerialization(), finalOut,
+              serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+              spilledRecordsCounter, null, merger.needsRLE());
           if (combiner == null || numSpills < minSpillsForCombine) {
             TezMerger.writeFile(kvIter, writer, progressable,
                 TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 726810b..e83b34e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -47,6 +47,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader.KeyState;
 import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
@@ -70,7 +71,7 @@ public class TezMerger {
 
   public static
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass, 
+                            SerializationContext serializationContext,
                             CompressionCodec codec, boolean ifileReadAhead,
                             int ifileReadAheadLength, int ifileBufferSize,
                             Path[] inputs, boolean deleteInputs, 
@@ -84,7 +85,7 @@ public class TezMerger {
     return 
       new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
                            ifileReadAheadLength, ifileBufferSize, false, comparator, 
-                           reporter, null).merge(keyClass, valueClass,
+                           reporter, null).merge(serializationContext,
                                            mergeFactor, tmpDir,
                                            readsCounter, writesCounter,
                                            bytesReadCounter,
@@ -94,7 +95,7 @@ public class TezMerger {
   // Used by the in-memory merger.
   public static
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs, 
-                            Class keyClass, Class valueClass, 
+                            SerializationContext serializationContext,
                             List<Segment> segments, 
                             int mergeFactor, Path tmpDir,
                             RawComparator comparator, Progressable reporter,
@@ -104,14 +105,14 @@ public class TezMerger {
                             Progress mergePhase)
       throws IOException, InterruptedException {
     // Get rid of this ?
-    return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
+    return merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
                  comparator, reporter, false, readsCounter, writesCounter, bytesReadCounter,
                  mergePhase);
   }
 
   public static <K extends Object, V extends Object>
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass,
+                            SerializationContext serializationContext,
                             List<Segment> segments,
                             int mergeFactor, Path tmpDir,
                             RawComparator comparator, Progressable reporter,
@@ -122,14 +123,14 @@ public class TezMerger {
                             Progress mergePhase)
       throws IOException, InterruptedException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
-                           sortSegments, false).merge(keyClass, valueClass,
-                                               mergeFactor, tmpDir,
+                           sortSegments, false).merge(serializationContext, mergeFactor, tmpDir,
                                                readsCounter, writesCounter,
                                                bytesReadCounter, mergePhase);
   }
 
   public static TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-      Class keyClass, Class valueClass, CompressionCodec codec,
+      SerializationContext serializationContext,
+      CompressionCodec codec,
       List<Segment> segments,
       int mergeFactor, Path tmpDir,
       RawComparator comparator, Progressable reporter,
@@ -139,13 +140,13 @@ public class TezMerger {
       Progress mergePhase) throws IOException, InterruptedException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
         false, codec, false, false)
-        .merge(keyClass, valueClass, mergeFactor, tmpDir,
+        .merge(serializationContext, mergeFactor, tmpDir,
             readsCounter, writesCounter, bytesReadCounter, mergePhase);
   }
 
   public static <K extends Object, V extends Object>
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-      Class keyClass, Class valueClass,
+      SerializationContext serializationContext,
       CompressionCodec codec,
       List<Segment> segments,
       int mergeFactor, Path tmpDir,
@@ -159,7 +160,7 @@ public class TezMerger {
       throws IOException, InterruptedException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
         sortSegments, codec, considerFinalMergeForProgress, checkForSameKeys).
-        merge(keyClass, valueClass,
+        merge(serializationContext,
             mergeFactor, tmpDir,
             readsCounter, writesCounter,
             bytesReadCounter,
@@ -168,7 +169,7 @@ public class TezMerger {
 
   public static <K extends Object, V extends Object>
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                            Class keyClass, Class valueClass,
+                            SerializationContext serializationContext,
                             CompressionCodec codec,
                             List<Segment> segments,
                             int mergeFactor, Path tmpDir,
@@ -182,8 +183,7 @@ public class TezMerger {
       throws IOException, InterruptedException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
                            sortSegments, codec, considerFinalMergeForProgress).
-                                         merge(keyClass, valueClass,
-                                             mergeFactor, tmpDir,
+                                         merge(serializationContext, mergeFactor, tmpDir,
                                              readsCounter, writesCounter,
                                              bytesReadCounter,
                                              mergePhase);
@@ -191,7 +191,7 @@ public class TezMerger {
 
   public static <K extends Object, V extends Object>
   TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
-                          Class keyClass, Class valueClass,
+                          SerializationContext serializationContext,
                           CompressionCodec codec,
                           List<Segment> segments,
                           int mergeFactor, int inMemSegments, Path tmpDir,
@@ -203,7 +203,7 @@ public class TezMerger {
                           Progress mergePhase)
       throws IOException, InterruptedException {
   return new MergeQueue(conf, fs, segments, comparator, reporter,
-                         sortSegments, codec, false).merge(keyClass, valueClass,
+                         sortSegments, codec, false).merge(serializationContext,
                                              mergeFactor, inMemSegments,
                                              tmpDir,
                                              readsCounter, writesCounter,
@@ -704,18 +704,18 @@ public class TezMerger {
       return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
     }
     
-    public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
-                                     int factor, Path tmpDir,
-                                     TezCounter readsCounter,
-                                     TezCounter writesCounter,
-                                     TezCounter bytesReadCounter,
-                                     Progress mergePhase)
+    public TezRawKeyValueIterator merge(SerializationContext serializationContext,
+                                        int factor, Path tmpDir,
+                                        TezCounter readsCounter,
+                                        TezCounter writesCounter,
+                                        TezCounter bytesReadCounter,
+                                        Progress mergePhase)
         throws IOException, InterruptedException {
-      return merge(keyClass, valueClass, factor, 0, tmpDir,
+      return merge(serializationContext, factor, 0, tmpDir,
                    readsCounter, writesCounter, bytesReadCounter, mergePhase);
     }
 
-    TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+    TezRawKeyValueIterator merge(SerializationContext serializationContext,
                                      int factor, int inMem, Path tmpDir,
                                      TezCounter readsCounter,
                                      TezCounter writesCounter,
@@ -866,9 +866,10 @@ public class TezMerger {
 
           // TODO Would it ever make sense to make this an in-memory writer ?
           // Merging because of too many disk segments - might fit in memory.
-          Writer writer = 
-            new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
-                             writesCounter, null);
+          Writer writer = new Writer(serializationContext.getKeySerialization(),
+              serializationContext.getValSerialization(), fs, outputFile,
+              serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+              writesCounter, null);
 
           writeFile(this, writer, reporter, recordsBeforeProgress);
           writer.close();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index d0a18b4..dd6c083 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -32,7 +32,6 @@ import java.util.zip.Deflater;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
@@ -250,14 +249,14 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
   synchronized void collect(Object key, Object value, final int partition
                                    ) throws IOException {
 
-    if (key.getClass() != keyClass) {
+    if (key.getClass() != serializationContext.getKeyClass()) {
       throw new IOException("Type mismatch in key from map: expected "
-                            + keyClass.getName() + ", received "
+                            + serializationContext.getKeyClass().getName() + ", received "
                             + key.getClass().getName());
     }
-    if (value.getClass() != valClass) {
+    if (value.getClass() != serializationContext.getValueClass()) {
       throw new IOException("Type mismatch in value from map: expected "
-                            + valClass.getName() + ", received "
+                            + serializationContext.getValueClass().getName() + ", received "
                             + value.getClass().getName());
     }
     if (partition < 0 || partition >= partitions) {
@@ -907,8 +906,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
           long segmentStart = out.getPos();
           if (spindex < mend && kvmeta.get(offsetFor(spindex) + PARTITION) == i
               || !sendEmptyPartitionDetails) {
-            writer = new Writer(conf, out, keyClass, valClass, codec,
-                spilledRecordsCounter, null, rle);
+            writer = new Writer(serializationContext.getKeySerialization(),
+                serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+                serializationContext.getValueClass(), codec, spilledRecordsCounter, null, rle);
           }
           if (combiner == null) {
             // spill directly
@@ -1014,8 +1014,9 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
           long segmentStart = out.getPos();
           // Create a new codec, don't care!
           if (!sendEmptyPartitionDetails || (i == partition)) {
-            writer = new Writer(conf, out, keyClass, valClass, codec,
-                spilledRecordsCounter, null, false);
+            writer = new Writer(serializationContext.getKeySerialization(),
+                serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+                serializationContext.getValueClass(), codec, spilledRecordsCounter, null, false);
           }
           if (i == partition) {
             final long recordStart = out.getPos();
@@ -1292,7 +1293,10 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
           long segmentStart = finalOut.getPos();
           if (!sendEmptyPartitionDetails) {
             Writer writer =
-                new Writer(conf, finalOut, keyClass, valClass, codec, null, null);
+                new Writer(serializationContext.getKeySerialization(),
+                    serializationContext.getValSerialization(), finalOut,
+                    serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+                    null, null);
             writer.close();
             rawLength = writer.getRawLength();
             partLength = writer.getCompressedLength();
@@ -1350,7 +1354,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
         boolean sortSegments = segmentList.size() > mergeFactor;
         //merge
         TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
-                       keyClass, valClass, codec,
+                       serializationContext, codec,
                        segmentList, mergeFactor,
                        new Path(taskIdentifier),
                        (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
@@ -1363,9 +1367,10 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab
         long rawLength = 0;
         long partLength = 0;
         if (shouldWrite) {
-        Writer writer =
-            new Writer(conf, finalOut, keyClass, valClass, codec,
-                spilledRecordsCounter, null);
+          Writer writer = new Writer(serializationContext.getKeySerialization(),
+              serializationContext.getValSerialization(), finalOut,
+              serializationContext.getKeyClass(), serializationContext.getValueClass(), codec,
+              spilledRecordsCounter, null);
         if (combiner == null || numSpills < minSpillsForCombine) {
           TezMerger.writeFile(kvIter, writer,
               progressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
index 26da98f..ecc9e03 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.io.serializer.Serialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -57,6 +58,8 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
   protected final Serializer keySerializer;
   protected final Serializer valSerializer;
   protected final SerializationFactory serializationFactory;
+  protected final Serialization keySerialization;
+  protected final Serialization valSerialization;
   protected final int numPartitions;
   protected final CompressionCodec codec;
   protected final TezTaskOutput outputFileHandler;
@@ -124,8 +127,10 @@ public abstract class BaseUnorderedPartitionedKVWriter extends KeyValuesWriter {
     keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
     valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
     serializationFactory = new SerializationFactory(this.conf);
-    keySerializer = serializationFactory.getSerializer(keyClass);
-    valSerializer = serializationFactory.getSerializer(valClass);
+    keySerialization = serializationFactory.getSerialization(keyClass);
+    valSerialization = serializationFactory.getSerialization(valClass);
+    keySerializer = keySerialization.getSerializer(keyClass);
+    valSerializer = valSerialization.getSerializer(valClass);
     
     outputRecordBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
     outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index d9467af..5ff2944 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -301,12 +300,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       //special case, where in only one partition is available.
       skipBuffers = true;
       if (this.useCachedStream) {
-        writer = new IFile.FileBackedInMemIFileWriter(conf, rfs, outputFileHandler, keyClass,
-            valClass, codec, outputRecordsCounter, outputRecordBytesCounter,
-            dataViaEventsMaxSize);
+        writer = new IFile.FileBackedInMemIFileWriter(keySerialization, valSerialization, rfs,
+            outputFileHandler, keyClass, valClass, codec, outputRecordsCounter,
+            outputRecordBytesCounter, dataViaEventsMaxSize);
       } else {
         finalOutPath = outputFileHandler.getOutputFileForWrite();
-        writer = new IFile.Writer(conf, rfs, finalOutPath, keyClass, valClass,
+        writer = new IFile.Writer(keySerialization, valSerialization, rfs, finalOutPath, keyClass, valClass,
             codec, outputRecordsCounter, outputRecordBytesCounter);
         ensureSpillFilePermissions(finalOutPath, rfs);
       }
@@ -643,7 +642,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
               continue;
             }
             if (writer == null) {
-              writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
+              writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null);
             }
             numRecords += writePartition(buffer.partitionPositions[i], buffer, writer, key, val);
           }
@@ -1086,7 +1085,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           LOG.info(destNameTrimmed + ": " + "Skipping partition: " + i + " in final merge since it has no records");
           continue;
         }
-        writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
+        writer = new Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null);
         try {
           if (currentBuffer.nextPosition != 0
               && currentBuffer.partitionPositions[i] != WrappedBuffer.PARTITION_ABSENT_POSITION) {
@@ -1177,7 +1176,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           spilledRecordsCounter.increment(1);
           Writer writer = null;
           try {
-            writer = new IFile.Writer(conf, out, keyClass, valClass, codec, null, null);
+            writer = new IFile.Writer(keySerialization, valSerialization, out, keyClass, valClass, codec, null, null);
             writer.append(key, value);
             outputLargeRecordsCounter.increment(1);
             numRecordsPerPartition[i]++;
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index 642f02b..eaded18 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -6,6 +6,7 @@ import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 
 import java.nio.ByteBuffer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.InputContext;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
 import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
@@ -102,8 +104,7 @@ public class TestValuesIterator {
   FileSystem fs;
   static final Random rnd = new Random();
 
-  final Class keyClass;
-  final Class valClass;
+  private SerializationContext serializationContext;
   final RawComparator comparator;
   final RawComparator correctComparator;
   final boolean expectedTestResult;
@@ -129,20 +130,18 @@ public class TestValuesIterator {
    * @param testResult             expected result
    * @throws IOException
    */
-  public TestValuesIterator(String serializationClassName, Class key, Class val,
+  public TestValuesIterator(String serializationClassName, Class<?> key, Class<?> val,
       TestWithComparator comparator, TestWithComparator correctComparator, boolean testResult)
       throws IOException {
-    this.keyClass = key;
-    this.valClass = val;
     this.comparator = getComparator(comparator);
     this.correctComparator =
         (correctComparator == null) ? this.comparator : getComparator(correctComparator);
     this.expectedTestResult = testResult;
     originalData = LinkedListMultimap.create();
-    setupConf(serializationClassName);
+    setupConf(key, val, serializationClassName);
   }
 
-  private void setupConf(String serializationClassName) throws IOException {
+  private void setupConf(Class<?> key, Class<?> val, String serializationClassName) throws IOException {
     mergeFactor = 2;
     conf = new Configuration();
     conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, mergeFactor);
@@ -154,6 +153,11 @@ public class TestValuesIterator {
     String localDirs = baseDir.toString();
     conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
     fs = FileSystem.getLocal(conf);
+
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    serializationContext = new SerializationContext(key, val,
+        serializationFactory.getSerialization(key), serializationFactory.getSerialization(val));
+    serializationContext.applyToConf(conf);
   }
 
   @Before
@@ -231,20 +235,21 @@ public class TestValuesIterator {
       streamPaths = new Path[0];
       //This will return EmptyIterator
       rawKeyValueIterator =
-          TezMerger.merge(conf, fs, keyClass, valClass, null,
+          TezMerger.merge(conf, fs, serializationContext, null,
               false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
               new ProgressReporter(), null, null, null, null);
     } else {
       List<TezMerger.Segment> segments = Lists.newLinkedList();
       //This will return EmptyIterator
       rawKeyValueIterator =
-          TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+          TezMerger.merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
               comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
               new GenericCounter("writesCounter", "y1"),
               new GenericCounter("bytesReadCounter", "y2"), new Progress());
     }
     return new ValuesIterator(rawKeyValueIterator, comparator,
-        keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
+        serializationContext.getKeyClass(), serializationContext.getValueClass(), conf,
+        (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
         (TezCounter) new GenericCounter("inputValueCounter", "y4"));
   }
 
@@ -332,19 +337,20 @@ public class TestValuesIterator {
       streamPaths = createFiles();
       //Merge all files to get KeyValueIterator
       rawKeyValueIterator =
-          TezMerger.merge(conf, fs, keyClass, valClass, null,
+          TezMerger.merge(conf, fs, serializationContext, null,
               false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
               new ProgressReporter(), null, null, null, null);
     } else {
       List<TezMerger.Segment> segments = createInMemStreams();
       rawKeyValueIterator =
-          TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+          TezMerger.merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
               comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
               new GenericCounter("writesCounter", "y1"),
               new GenericCounter("bytesReadCounter", "y2"), new Progress());
     }
-    return new ValuesIterator(rawKeyValueIterator, comparator,
-        keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
+    return new ValuesIterator(rawKeyValueIterator, comparator, serializationContext.getKeyClass(),
+        serializationContext.getValueClass(), conf,
+        (TezCounter) new GenericCounter("inputKeyCounter", "y3"),
         (TezCounter) new GenericCounter("inputValueCounter", "y4"));
   }
 
@@ -364,19 +370,19 @@ public class TestValuesIterator {
       streamPaths = createFiles();
       //Merge all files to get KeyValueIterator
       rawKeyValueIterator =
-          TezMerger.merge(conf, fs, keyClass, valClass, null,
+          TezMerger.merge(conf, fs, serializationContext, null,
               false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator,
               new ProgressReporter(), null, null, null, null);
     } else {
       List<TezMerger.Segment> segments = createInMemStreams();
       rawKeyValueIterator =
-          TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir,
+          TezMerger.merge(conf, fs, serializationContext, segments, mergeFactor, tmpDir,
               comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"),
               new GenericCounter("writesCounter", "y1"),
               new GenericCounter("bytesReadCounter", "y2"), new Progress());
     }
-    return new ValuesIterator(rawKeyValueIterator, comparator,
-        keyClass, valClass, conf, keyCounter, tupleCounter);
+    return new ValuesIterator(rawKeyValueIterator, comparator, serializationContext.getKeyClass(),
+        serializationContext.getValueClass(), conf, keyCounter, tupleCounter);
   }
 
   @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3} {4} {5} {6}]")
@@ -454,7 +460,9 @@ public class TestValuesIterator {
       paths[i] = new Path(baseDir, "ifile_" + i + ".out");
       FSDataOutputStream out = fs.create(paths[i]);
       //write data with RLE
-      IFile.Writer writer = new IFile.Writer(conf, out, keyClass, valClass, null, null, null, true);
+      IFile.Writer writer = new IFile.Writer(serializationContext.getKeySerialization(),
+          serializationContext.getValSerialization(), out, serializationContext.getKeyClass(),
+          serializationContext.getValueClass(), null, null, null, true);
       Map<Writable, Writable> data = createData();
 
       for (Map.Entry<Writable, Writable> entry : data.entrySet()) {
@@ -487,9 +495,8 @@ public class TestValuesIterator {
     int numberOfStreams = Math.max(2, rnd.nextInt(10));
     LOG.info("No of streams : " + numberOfStreams);
 
-    SerializationFactory serializationFactory = new SerializationFactory(conf);
-    Serializer keySerializer = serializationFactory.getSerializer(keyClass);
-    Serializer valueSerializer = serializationFactory.getSerializer(valClass);
+    Serializer keySerializer = serializationContext.getKeySerializer();
+    Serializer valueSerializer = serializationContext.getValueSerializer();
 
     LocalDirAllocator localDirAllocator =
         new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
@@ -549,8 +556,8 @@ public class TestValuesIterator {
   private Map<Writable, Writable> createData() {
     Map<Writable, Writable> map = new TreeMap<Writable, Writable>(comparator);
     for (int j = 0; j < Math.max(10, rnd.nextInt(50)); j++) {
-      Writable key = createData(keyClass);
-      Writable value = createData(valClass);
+      Writable key = createData(serializationContext.getKeyClass());
+      Writable value = createData(serializationContext.getValueClass());
       map.put(key, value);
       //sortedDataMap.put(key, value);
     }
@@ -558,7 +565,7 @@ public class TestValuesIterator {
   }
 
 
-  private Writable createData(Class c) {
+  private Writable createData(Class<?> c) {
     if (c.getName().equalsIgnoreCase(BytesWritable.class.getName())) {
       return new BytesWritable(new BigInteger(256, rnd).toString().getBytes());
     } else if (c.getName().equalsIgnoreCase(IntWritable.class.getName())) {
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
index 6fef944..42231e9 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -131,8 +132,8 @@ public class TestUnorderedKVReader {
 
   private void createIFile(Path path, int recordCount) throws IOException {
     FSDataOutputStream out = localFs.create(path);
-    IFile.Writer writer =
-        new IFile.Writer(defaultConf, out, Text.class, Text.class, null, null, null, true);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        out, Text.class, Text.class, null, null, null, true);
 
     for (int i = 0; i < recordCount; i++) {
       writer.append(new Text("Key_" + i), new Text("Value_" + i));
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 9cffcc7..13f090c 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -38,6 +38,7 @@ import java.util.UUID;
 import com.google.common.collect.Sets;
 
 import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -630,8 +631,8 @@ public class TestMergeManager {
   private byte[] generateDataBySize(Configuration conf, int rawLen, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
-    IFile.Writer writer =
-        new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        fsdos, IntWritable.class, IntWritable.class, null, null, null);
     int i = 0;
     while(true) {
       writer.append(new IntWritable(i), new IntWritable(i));
@@ -653,8 +654,8 @@ public class TestMergeManager {
                                                InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
-    IFile.Writer writer =
-            new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        fsdos, IntWritable.class, IntWritable.class, null, null, null);
     int i = 0;
     while(true) {
       writer.append(new IntWritable(i), new IntWritable(i));
@@ -676,8 +677,8 @@ public class TestMergeManager {
                               InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream fsdos = new FSDataOutputStream(baos, null);
-    IFile.Writer writer =
-        new IFile.Writer(conf, fsdos, IntWritable.class, IntWritable.class, null, null, null);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        fsdos, IntWritable.class, IntWritable.class, null, null, null);
     for (int i = 0; i < numEntries; ++i) {
       writer.append(new IntWritable(i), new IntWritable(i));
     }
@@ -1015,7 +1016,8 @@ public class TestMergeManager {
     for (int i = 0; i < numPartitions; i++) {
       long pos = outStream.getPos();
       IFile.Writer writer =
-          new IFile.Writer(conf, outStream, IntWritable.class, IntWritable.class, null, null, null);
+          new IFile.Writer(new WritableSerialization(), new WritableSerialization(), outStream,
+              IntWritable.class, IntWritable.class, null, null, null);
       for (int j = 0; j < numKeysPerPartition; j++) {
         writer.append(new IntWritable(currentKey), new IntWritable(currentKey));
         currentKey++;
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
index b9c556d..c74496e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestIFile.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
@@ -144,7 +145,7 @@ public class TestIFile {
   public void testWritingEmptyKeyValues() throws IOException {
     DataInputBuffer key = new DataInputBuffer();
     DataInputBuffer value = new DataInputBuffer();
-    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath, null, null, null,
+    IFile.Writer writer = new IFile.Writer(null, null, localFs, outputPath, null, null, null,
         null, null);
     writer.append(key, value);
     writer.append(key, value);
@@ -203,7 +204,7 @@ public class TestIFile {
 
     // Check Key length exceeding MAX_BUFFER_SIZE
     out = localFs.create(outputPath);
-    writer = new IFile.Writer(defaultConf, out,
+    writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
             Text.class, Text.class, null, null, null, false);
     writer.append(longString, shortString);
     writer.close();
@@ -226,7 +227,7 @@ public class TestIFile {
 
     // Check Value length exceeding MAX_BUFFER_SIZE
     out = localFs.create(outputPath);
-    writer = new IFile.Writer(defaultConf, out,
+    writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
             Text.class, Text.class, null, null, null, false);
     writer.append(shortString, longString);
     writer.close();
@@ -250,7 +251,7 @@ public class TestIFile {
 
     // Check Key length not getting doubled
     out = localFs.create(outputPath);
-    writer = new IFile.Writer(defaultConf, out,
+    writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
             Text.class, Text.class, null, null, null, false);
     writer.append(longString, shortString);
     writer.close();
@@ -269,7 +270,7 @@ public class TestIFile {
 
     // Check Value length not getting doubled
     out = localFs.create(outputPath);
-    writer = new IFile.Writer(defaultConf, out,
+    writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
             Text.class, Text.class, null, null, null, false);
     writer.append(shortString, longString);
     writer.close();
@@ -296,7 +297,7 @@ public class TestIFile {
   public void testWithRLEMarker() throws IOException {
     //Test with append(Object, Object)
     FSDataOutputStream out = localFs.create(outputPath);
-    IFile.Writer writer = new IFile.Writer(defaultConf, out,
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
         Text.class, IntWritable.class, codec, null, null, true);
 
     Text key = new Text("key0");
@@ -322,7 +323,7 @@ public class TestIFile {
     int valueLength = 6;
     int pos = 0;
     out = localFs.create(outputPath);
-    writer = new IFile.Writer(defaultConf, out,
+    writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
         Text.class, IntWritable.class, codec, null, null, true);
 
     BoundedByteArrayOutputStream boundedOut = new BoundedByteArrayOutputStream(1024*1024);
@@ -465,8 +466,8 @@ public class TestIFile {
   //Test appendValue feature
   public void testAppendValue() throws IOException {
     List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
-    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
-        Text.class, IntWritable.class, codec, null, null);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
 
     Text previousKey = null;
     for (KVPair kvp : data) {
@@ -496,8 +497,8 @@ public class TestIFile {
       values.add(val);
     }
 
-    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
-        Text.class, IntWritable.class, codec, null, null);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
     writer.append(data.get(0).getKey(), data.get(0).getvalue()); //write first KV pair
     writer.appendValues(values.subList(1, values.size()).iterator()); //add the rest here
 
@@ -524,7 +525,8 @@ public class TestIFile {
     }
 
     TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
-    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(
+        new WritableSerialization(), new WritableSerialization(), localFs, tezTaskOutput,
         Text.class, IntWritable.class, codec, null, null,
         200);
 
@@ -547,7 +549,8 @@ public class TestIFile {
   public void testFileBackedInMemIFileWriterWithSmallBuffer() throws IOException {
     List<KVPair> data = new ArrayList<>();
     TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
-    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(
+        new WritableSerialization(), new WritableSerialization(), localFs, tezTaskOutput,
         Text.class, IntWritable.class, codec, null, null,
         2);
 
@@ -581,7 +584,8 @@ public class TestIFile {
 
     // Setting cache limit to 20. Actual data would be around 43 bytes, so it would spill over.
     TezTaskOutputFiles tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
-    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(
+        new WritableSerialization(), new WritableSerialization(), localFs, tezTaskOutput,
         Text.class, IntWritable.class, codec, null, null,
         20);
     writer.setOutputPath(outputPath);
@@ -614,7 +618,8 @@ public class TestIFile {
     TezTaskOutputFiles
         tezTaskOutput = new TezTaskOutputFiles(defaultConf, "uniqueId", 1);
 
-    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(defaultConf, localFs, tezTaskOutput,
+    IFile.FileBackedInMemIFileWriter writer = new IFile.FileBackedInMemIFileWriter(
+        new WritableSerialization(), new WritableSerialization(), localFs, tezTaskOutput,
         Text.class, IntWritable.class, codec, null, null,
         100);
 
@@ -644,8 +649,8 @@ public class TestIFile {
       values.add(val);
     }
 
-    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
-        Text.class, IntWritable.class, codec, null, null);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
     writer.appendKeyValues(data.get(0).getKey(), values.iterator());
 
     Text lastKey = new Text("key3");
@@ -662,8 +667,8 @@ public class TestIFile {
   //Test appendValue with DataInputBuffer
   public void testAppendValueWithDataInputBuffer() throws IOException {
     List<KVPair> data = KVDataGen.generateTestData(false, rnd.nextInt(100));
-    IFile.Writer writer = new IFile.Writer(defaultConf, localFs, outputPath,
-        Text.class, IntWritable.class, codec, null, null);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        localFs, outputPath, Text.class, IntWritable.class, codec, null, null);
 
     final DataInputBuffer previousKey = new DataInputBuffer();
     DataInputBuffer key = new DataInputBuffer();
@@ -914,7 +919,7 @@ public class TestIFile {
   private Writer writeTestFile(boolean rle, boolean repeatKeys,
       List<KVPair> data, CompressionCodec codec) throws IOException {
     FSDataOutputStream out = localFs.create(outputPath);
-    IFile.Writer writer = new IFile.Writer(defaultConf, out,
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
         Text.class, IntWritable.class, codec, null, null, rle);
     writeTestFile(writer, repeatKeys, data);
     out.close();
@@ -947,7 +952,7 @@ public class TestIFile {
   private Writer writeTestFileUsingDataBuffer(boolean rle, boolean repeatKeys,
       List<KVPair> data, CompressionCodec codec) throws IOException {
     FSDataOutputStream out = localFs.create(outputPath);
-    IFile.Writer writer = new IFile.Writer(defaultConf, out,
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out,
         Text.class, IntWritable.class, codec, null, null, rle);
     writeTestFileUsingDataBuffer(writer, repeatKeys, data);
     out.close();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index af10700..c6574d7 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.serializer.WritableSerialization;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.serializer.SerializationContext;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryReader;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
@@ -136,7 +138,7 @@ public class TestTezMerger {
   private Path createIFileWithTextData(List<String> data) throws IOException {
     Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
     FSDataOutputStream out = localFs.create(path);
-    IFile.Writer writer = new IFile.Writer(defaultConf, out, Text.class,
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), out, Text.class,
         Text.class, null, null, null, true);
     for (String key : data) {
       writer.append(new Text(key), new Text(key + "_" + System.nanoTime()));
@@ -568,11 +570,12 @@ public class TestTezMerger {
   private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc)
       throws IOException, InterruptedException {
     TezMerger merger = new TezMerger();
-    TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
-        LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
-        true, 4, new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
-        new Reporter(), null, null,
-        null, new Progress());
+    TezRawKeyValueIterator records = merger.merge(defaultConf, localFs,
+        new SerializationContext(IntWritable.class, LongWritable.class, new WritableSerialization(),
+            new WritableSerialization()),
+        null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]), true, 4,
+        new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
+        new Reporter(), null, null, null, new Progress());
     return records;
   }
 
@@ -602,12 +605,12 @@ public class TestTezMerger {
   private void merge(List<Path> pathList, int mergeFactor, RawComparator rc) throws Exception {
     //Merge datasets
     TezMerger merger = new TezMerger();
-    TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
-        LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),
-        true, mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()),
-        ((rc == null) ? comparator : rc), new Reporter(), null, null,
-        null,
-        new Progress());
+    TezRawKeyValueIterator records = merger.merge(defaultConf, localFs,
+        new SerializationContext(IntWritable.class, LongWritable.class, new WritableSerialization(),
+            new WritableSerialization()),
+        null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]), true, mergeFactor,
+        new Path(workDir, "tmp_" + System.nanoTime()), ((rc == null) ? comparator : rc),
+        new Reporter(), null, null, null, new Progress());
 
     verifyData(records);
     verificationDataSet.clear();
@@ -701,9 +704,10 @@ public class TestTezMerger {
     TezMerger.MergeQueue mergeQueue = new TezMerger.MergeQueue(defaultConf, localFs, segmentList,
         comparator, new Reporter(), false, false);
 
-    TezRawKeyValueIterator records = mergeQueue.merge(IntWritable.class, LongWritable.class,
-        mergeFactor, new Path(workDir, "tmp_"
-        + System.nanoTime()), null, null, null, null);
+    TezRawKeyValueIterator records = mergeQueue.merge(
+        new SerializationContext(IntWritable.class, LongWritable.class, new WritableSerialization(),
+            new WritableSerialization()),
+        mergeFactor, new Path(workDir, "tmp_" + System.nanoTime()), null, null, null, null);
 
     //Verify the merged data is correct
     verifyData(records);
@@ -770,8 +774,8 @@ public class TestTezMerger {
     Path path = new Path(workDir + "/src", "data_" + System.nanoTime() + ".out");
     FSDataOutputStream out = localFs.create(path);
     //create IFile with RLE
-    IFile.Writer writer = new IFile.Writer(defaultConf, out, IntWritable.class
-        , LongWritable.class, null, null, null, true);
+    IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(),
+        out, IntWritable.class, LongWritable.class, null, null, null, true);
 
     for (Integer key : dataSet.keySet()) {
       for (Long value : dataSet.get(key)) {