You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/02/23 08:00:33 UTC

git commit: TEZ-880. Support sending deserialized data in RootInputDataInformationEvents. (sseth)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 25cb89305 -> 1c112c103


TEZ-880. Support sending deserialized data in
RootInputDataInformationEvents. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1c112c10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1c112c10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1c112c10

Branch: refs/heads/master
Commit: 1c112c1034a3da21bb709c3708d358323c1a80eb
Parents: 25cb893
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Feb 22 23:00:15 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Feb 22 23:00:15 2014 -0800

----------------------------------------------------------------------
 .../events/RootInputDataInformationEvent.java   | 35 +++++++-
 .../common/MRInputAMSplitGenerator.java         | 55 +++++++-----
 .../common/MRInputSplitDistributor.java         | 43 +++++++--
 .../tez/mapreduce/hadoop/InputSplitInfoMem.java | 95 ++++++++++++++++++--
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  | 78 +++++++++-------
 .../tez/mapreduce/hadoop/MRJobConfig.java       |  4 +
 .../org/apache/tez/mapreduce/input/MRInput.java | 21 +++--
 7 files changed, 247 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
index 0b0d32f..25fd9de 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/RootInputDataInformationEvent.java
@@ -18,23 +18,47 @@
 
 package org.apache.tez.runtime.api.events;
 
+import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 
 /**
  * Events used by {@link TezRootInputInitializer} implementations to provide the
- * byte payload for individual tasks running as part of the Vertex for which an
+ * user payload for individual tasks running as part of the Vertex for which an
  * Initial Input has been configured.
+ * 
+ * This event is used by InputInitialziers to configure tasks belonging to a
+ * Vertex. The event may be processed by a @link {@link VertexManagerPlugin}
+ * before being sent to tasks.
+ * 
+ * A {@link TezRootInputInitializer} may send Events with or without a
+ * serialized user payload.
+ * 
+ * Events, after being processed by a {@link VertexManagerPlugin,} must
+ * contained the payload in a serialized form.
  */
 public final class RootInputDataInformationEvent extends Event {
 
   private final int sourceIndex;
   private int targetIndex; // TODO Likely to be multiple at a later point.
   private final byte[] userPayload;
+  private final Object userPayloadObject;
   
+  /**
+   * Provide a serialzied form of the payload
+   * @param srcIndex the src index
+   * @param userPayload the serizlied payload
+   */
   public RootInputDataInformationEvent(int srcIndex, byte[] userPayload) {
     this.sourceIndex = srcIndex;
     this.userPayload = userPayload;
+    this.userPayloadObject = null;
+  }
+  
+  public RootInputDataInformationEvent(int srcIndex, Object userPayloadDeserialized) {
+    this.sourceIndex = srcIndex;
+    this.userPayloadObject = userPayloadDeserialized;
+    this.userPayload = null;
   }
 
   public int getSourceIndex() {
@@ -52,10 +76,15 @@ public final class RootInputDataInformationEvent extends Event {
   public byte[] getUserPayload() {
     return this.userPayload;
   }
+  
+  public Object getDeserializedUserPayload() {
+    return this.userPayloadObject;
+  }
 
   @Override
   public String toString() {
     return "RootInputDataInformationEvent [sourceIndex=" + sourceIndex + ", targetIndex="
-        + targetIndex + "]";
-  }  
+        + targetIndex + ", serializedUserPayloadExists=" + (userPayload != null)
+        + ", deserializedUserPayloadExists=" + (userPayloadObject != null) + "]";
+  } 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index e87feae..414778f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -26,13 +26,13 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -47,6 +47,8 @@ import com.google.common.collect.Lists;
 
 public class MRInputAMSplitGenerator implements TezRootInputInitializer {
 
+  private boolean sendSerializedEvents;
+  
   private static final Log LOG = LogFactory
       .getLog(MRInputAMSplitGenerator.class);
 
@@ -72,6 +74,10 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
     }
     Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
         .getConfigurationBytes());
+    sendSerializedEvents = conf.getBoolean(
+        MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD,
+        MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD_DEFAULT);
+    LOG.info("Emitting serialized splits: " + sendSerializedEvents);
     if (LOG.isDebugEnabled()) {
       sw.stop();
       LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis());
@@ -104,16 +110,11 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
         Job job = Job.getInstance(conf);
         org.apache.hadoop.mapreduce.InputSplit[] splits = MRHelpers
             .generateNewSplits(job, realInputFormatName, numTasks);
-        SerializationFactory serializationFactory = new SerializationFactory(
-            job.getConfiguration());
-
-        MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
 
+        // Move all this into a function
         List<TaskLocationHint> locationHints = Lists
             .newArrayListWithCapacity(splits.length);
         for (org.apache.hadoop.mapreduce.InputSplit split : splits) {
-          splitsBuilder.addSplits(MRHelpers.createSplitProto(split,
-              serializationFactory));
           String rack = 
               ((org.apache.hadoop.mapreduce.split.TezGroupedSplit) split).getRack();
           if (rack == null) {
@@ -128,17 +129,14 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
                 Collections.singleton(rack)));
           }
         }
-        inputSplitInfo = new InputSplitInfoMem(splitsBuilder.build(),
-            locationHints, splits.length, null);
+        inputSplitInfo = new InputSplitInfoMem(splits, locationHints, splits.length, null, conf);
       } else {
         LOG.info("Grouping mapred api input splits");
         org.apache.hadoop.mapred.InputSplit[] splits = MRHelpers
             .generateOldSplits(jobConf, realInputFormatName, numTasks);
         List<TaskLocationHint> locationHints = Lists
             .newArrayListWithCapacity(splits.length);
-        MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
         for (org.apache.hadoop.mapred.InputSplit split : splits) {
-          splitsBuilder.addSplits(MRHelpers.createSplitProto(split));
           String rack = 
               ((org.apache.hadoop.mapred.split.TezGroupedSplit) split).getRack();
           if (rack == null) {
@@ -153,8 +151,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
                 Collections.singleton(rack)));
           }
         }
-        inputSplitInfo = new InputSplitInfoMem(splitsBuilder.build(),
-            locationHints, splits.length, null);
+        inputSplitInfo = new InputSplitInfoMem(splits, locationHints, splits.length, null, conf);
       }
     } else {
       inputSplitInfo = MRHelpers.generateInputSplitsToMem(conf);
@@ -171,16 +168,30 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer {
         inputSplitInfo.getNumTasks(), inputSplitInfo.getTaskLocationHints());
     events.add(configureVertexEvent);
 
-    MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
-
-    int count = 0;
-    for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
-      // Unnecessary array copy, can be avoided by using ByteBuffer instead of a
-      // raw array.
-      RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
-          count++, mrSplit.toByteArray());
-      events.add(diEvent);
+    if (sendSerializedEvents) {
+      MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
+      int count = 0;
+      for (MRSplitProto mrSplit : splitsProto.getSplitsList()) {
+        // Unnecessary array copy, can be avoided by using ByteBuffer instead of a raw array.
+        RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++,
+            mrSplit.toByteArray());
+        events.add(diEvent);
+      }
+    } else {
+      int count = 0;
+      if (inputSplitInfo.holdsNewFormatSplits()) {
+        for (org.apache.hadoop.mapreduce.InputSplit split : inputSplitInfo.getNewFormatSplits()) {
+          RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+          events.add(diEvent);
+        }
+      } else {
+        for (org.apache.hadoop.mapred.InputSplit split : inputSplitInfo.getOldFormatSplits()) {
+          RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(count++, split);
+          events.add(diEvent);
+        }
+      }
     }
+    
     return events;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index b6bdcfc..b1a0a4d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -23,7 +23,11 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
@@ -38,8 +42,9 @@ import com.google.common.collect.Lists;
 
 public class MRInputSplitDistributor implements TezRootInputInitializer {
 
-  private static final Log LOG = LogFactory
-      .getLog(MRInputSplitDistributor.class);
+  private static final Log LOG = LogFactory.getLog(MRInputSplitDistributor.class);
+  
+  private boolean sendSerializedEvents;
 
   public MRInputSplitDistributor() {
   }
@@ -59,7 +64,14 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
       LOG.debug("Time to parse MRInput payload into prot: "
           + sw.elapsedMillis());  
     }
-    
+    Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+        .getConfigurationBytes());
+    JobConf jobConf = new JobConf(conf);
+    boolean useNewApi = jobConf.getUseNewMapper();
+    sendSerializedEvents = conf.getBoolean(
+        MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD,
+        MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD_DEFAULT);
+    LOG.info("Emitting serialized splits: " + sendSerializedEvents);
 
     this.splitsProto = userPayloadProto.getSplits();
     
@@ -72,12 +84,27 @@ public class MRInputSplitDistributor implements TezRootInputInitializer {
 
     events.add(updatePayloadEvent);
     int count = 0;
+
     for (MRSplitProto mrSplit : this.splitsProto.getSplitsList()) {
-      // Unnecessary array copy, can be avoided by using ByteBuffer instead of a
-      // raw array.
-      RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
-          count++, mrSplit.toByteArray());
-      events.add(diEvent);
+
+      RootInputDataInformationEvent diEvent;
+
+      if (sendSerializedEvents) {
+        // Unnecessary array copy, can be avoided by using ByteBuffer instead of
+        // a raw array.
+        diEvent = new RootInputDataInformationEvent(count++, mrSplit.toByteArray());
+      } else {
+        if (useNewApi) {
+          org.apache.hadoop.mapreduce.InputSplit newInputSplit = MRInput
+              .getNewSplitDetailsFromEvent(mrSplit, conf);
+          diEvent = new RootInputDataInformationEvent(count++, newInputSplit);
+        } else {
+          org.apache.hadoop.mapred.InputSplit oldInputSplit = MRInput.getOldSplitDetailsFromEvent(
+              mrSplit, conf);
+          diEvent = new RootInputDataInformationEvent(count++, oldInputSplit);
+        }
+        events.add(diEvent);
+      }
     }
 
     return events;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
index b77f668..6cdbaaa 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
@@ -18,13 +18,18 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
+import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.security.Credentials;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Represents InputSplitInfo for splits generated to memory. </p>
  * 
@@ -34,17 +39,40 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
  */
 public class InputSplitInfoMem implements InputSplitInfo {
 
-  private final MRSplitsProto splitsProto;
-  private final List<TaskLocationHint> taskLocationHints;
+//  private final MRSplitsProto splitsProto;
+  private final boolean isNewSplit;
   private final int numTasks;
   private final Credentials credentials;
+  private final Configuration conf;
+  private final List<TaskLocationHint> taskLocationHints;
+  
+  private org.apache.hadoop.mapreduce.InputSplit[] newFormatSplits;
+  private org.apache.hadoop.mapred.InputSplit[] oldFormatSplits;
 
-  public InputSplitInfoMem(MRSplitsProto splitsProto,
-      List<TaskLocationHint> taskLocationHints, int numTasks, Credentials credentials) {
-    this.splitsProto = splitsProto;
+  // TaskLocationHints accepted as a parameter since InputSplits don't have rack
+  // info, and it can't always be derived.
+  public InputSplitInfoMem(org.apache.hadoop.mapreduce.InputSplit[] newSplits,
+      List<TaskLocationHint> taskLocationHints, int numTasks, Credentials credentials,
+      Configuration conf) {
+    this.isNewSplit = true;
+    this.newFormatSplits = newSplits;
     this.taskLocationHints = taskLocationHints;
     this.numTasks = numTasks;
     this.credentials = credentials;
+    this.conf = conf;
+  }
+
+  // TaskLocationHints accepted as a parameter since InputSplits don't have rack
+  // info, and it can't always be derived.
+  public InputSplitInfoMem(org.apache.hadoop.mapred.InputSplit[] oldSplits,
+      List<TaskLocationHint> taskLocationHints, int numTasks, Credentials credentials,
+      Configuration conf) {
+    this.isNewSplit = false;
+    this.oldFormatSplits = oldSplits;
+    this.taskLocationHints = taskLocationHints;
+    this.numTasks = numTasks;
+    this.credentials = credentials;
+    this.conf = conf;
   }
 
   @Override
@@ -76,11 +104,66 @@ public class InputSplitInfoMem implements InputSplitInfo {
 
   @Override
   public MRSplitsProto getSplitsProto() {
-    return this.splitsProto;
+    if (isNewSplit) {
+      try {
+        return createSplitsProto(newFormatSplits, new SerializationFactory(conf));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      try {
+        return createSplitsProto(oldFormatSplits);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
   @Override
   public Credentials getCredentials() {
     return this.credentials;
   }
+  
+  public boolean holdsNewFormatSplits() {
+    return this.isNewSplit;  
+  }
+
+  public org.apache.hadoop.mapreduce.InputSplit[] getNewFormatSplits() {
+    Preconditions
+        .checkState(
+            isNewSplit == true,
+            "Cannot fetch newSplits for an instance handling oldFormatSplits. Use holdsNewFormatSplits() to check type");
+    return newFormatSplits;
+  }
+
+  public org.apache.hadoop.mapred.InputSplit[] getOldFormatSplits() {
+    Preconditions
+        .checkState(
+            isNewSplit == false,
+            "Cannot fetch newSplits for an instance handling newFormatSplits. Use holdsNewFormatSplits() to check type");
+    return oldFormatSplits;
+  }
+
+  private static MRSplitsProto createSplitsProto(
+      org.apache.hadoop.mapreduce.InputSplit[] newSplits,
+      SerializationFactory serializationFactory) throws IOException,
+      InterruptedException {
+    MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+
+    for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
+      splitsBuilder.addSplits(MRHelpers.createSplitProto(newSplit, serializationFactory));
+    }
+    return splitsBuilder.build();
+  }
+
+  private static MRSplitsProto createSplitsProto(
+      org.apache.hadoop.mapred.InputSplit[] oldSplits) throws IOException {
+    MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+    for (org.apache.hadoop.mapred.InputSplit oldSplit : oldSplits) {
+      splitsBuilder.addSplits(MRHelpers.createSplitProto(oldSplit));
+    }
+    return splitsBuilder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index a01503c..91fa4ae 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
 import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.ContainerLogAppender;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -81,7 +80,9 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
@@ -369,34 +370,21 @@ public class MRHelpers {
       Job job = Job.getInstance(conf);
       org.apache.hadoop.mapreduce.InputSplit[] splits = 
           generateNewSplits(job, null, 0);
-      splitInfoMem = createSplitsProto(splits, new SerializationFactory(job.getConfiguration()), job.getCredentials());
+      splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
+          splits.length, job.getCredentials(), job.getConfiguration());
     } else {
       LOG.info("Generating mapred api input splits");
       org.apache.hadoop.mapred.InputSplit[] splits = 
           generateOldSplits(jobConf, null, 0);
-      splitInfoMem = createSplitsProto(splits, jobConf.getCredentials());
+      splitInfoMem = new InputSplitInfoMem(splits, createTaskLocationHintsFromSplits(splits),
+          splits.length, jobConf.getCredentials(), jobConf);
     }
     LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
         + splitInfoMem.getSplitsProto().getSerializedSize());
     return splitInfoMem;
   }
 
-  private static InputSplitInfoMem createSplitsProto(
-      org.apache.hadoop.mapreduce.InputSplit[] newSplits,
-      SerializationFactory serializationFactory, Credentials credentials) throws IOException,
-      InterruptedException {
-    MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
-
-    List<TaskLocationHint> locationHints = Lists
-        .newArrayListWithCapacity(newSplits.length);
-    for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
-      splitsBuilder.addSplits(createSplitProto(newSplit, serializationFactory));
-      locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays
-          .asList(newSplit.getLocations())), null));
-    }
-    return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
-        newSplits.length, credentials);
-  }
+  
 
   @Private
   public static <T extends org.apache.hadoop.mapreduce.InputSplit> MRSplitProto createSplitProto(
@@ -421,21 +409,6 @@ public class MRHelpers {
     return builder.build();
   }
 
-  private static InputSplitInfoMem createSplitsProto(
-      org.apache.hadoop.mapred.InputSplit[] oldSplits, Credentials credentials) throws IOException {
-    MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
-
-    List<TaskLocationHint> locationHints = Lists
-        .newArrayListWithCapacity(oldSplits.length);
-    for (org.apache.hadoop.mapred.InputSplit oldSplit : oldSplits) {
-      splitsBuilder.addSplits(createSplitProto(oldSplit));
-      locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays
-          .asList(oldSplit.getLocations())), null));
-    }
-    return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
-        oldSplits.length, credentials);
-  }
-
   @Private
   public static MRSplitProto createSplitProto(
       org.apache.hadoop.mapred.InputSplit oldSplit) throws IOException {
@@ -1008,4 +981,41 @@ public class MRHelpers {
     deserializer.close();
     return inputSplit;
   }
+
+  private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
+      org.apache.hadoop.mapreduce.InputSplit[] newFormatSplits) {
+    Iterable<TaskLocationHint> iterable = Iterables.transform(Arrays.asList(newFormatSplits),
+        new Function<org.apache.hadoop.mapreduce.InputSplit, TaskLocationHint>() {
+          @Override
+          public TaskLocationHint apply(org.apache.hadoop.mapreduce.InputSplit input) {
+            try {
+              return new TaskLocationHint(new HashSet<String>(Arrays.asList(input.getLocations())),
+                  null);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+    return Lists.newArrayList(iterable);
+  }
+
+  private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
+      org.apache.hadoop.mapred.InputSplit[] oldFormatSplits) {
+    Iterable<TaskLocationHint> iterable = Iterables.transform(Arrays.asList(oldFormatSplits),
+        new Function<org.apache.hadoop.mapred.InputSplit, TaskLocationHint>() {
+          @Override
+          public TaskLocationHint apply(org.apache.hadoop.mapred.InputSplit input) {
+            try {
+              return new TaskLocationHint(new HashSet<String>(Arrays.asList(input.getLocations())),
+                  null);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+
+    return Lists.newArrayList(iterable);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 45de43a..eb9b005 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -624,4 +624,8 @@ public interface MRJobConfig {
   public static final String MR_TEZ_SPLITS_VIA_EVENTS = MR_TEZ_PREFIX + "splits.via.events";
   public static final boolean MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT = true;
 
+  public static final String MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD = MR_TEZ_PREFIX
+      + "input.initializer.serialize.event.payload";
+  public static final boolean MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD_DEFAULT = true;
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1c112c10/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 706f4fe..383b952 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -373,12 +373,12 @@ public class MRInput implements LogicalInput {
     MRSplitProto splitProto = MRSplitProto
         .parseFrom(initEvent.getUserPayload());
     if (useNewApi) {
-      newInputSplit = getNewSplitDetailsFromEvent(splitProto);
+      newInputSplit = getNewSplitDetailsFromEvent(splitProto, jobConf);
       LOG.info("Split Details -> SplitClass: "
           + newInputSplit.getClass().getName() + ", NewSplit: " + newInputSplit);
       setupNewRecordReader();
     } else {
-      oldInputSplit = getOldSplitDetailsFromEvent(splitProto);
+      oldInputSplit = getOldSplitDetailsFromEvent(splitProto, jobConf);
       LOG.info("Split Details -> SplitClass: "
           + oldInputSplit.getClass().getName() + ", OldSplit: " + oldInputSplit);
       setupOldRecordReader();
@@ -386,12 +386,11 @@ public class MRInput implements LogicalInput {
     LOG.info("Initialized RecordReader from event");
   }
 
-  private InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto)
+  @Private
+  public static InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto, Configuration conf)
       throws IOException {
-    SerializationFactory serializationFactory = new SerializationFactory(
-        jobConf);
-    return MRHelpers.createOldFormatSplitFromUserPayload(splitProto,
-        serializationFactory);
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    return MRHelpers.createOldFormatSplitFromUserPayload(splitProto, serializationFactory);
   }
   
   @SuppressWarnings("unchecked")
@@ -430,10 +429,10 @@ public class MRInput implements LogicalInput {
     return split;
   }
 
-  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
-      MRSplitProto splitProto) throws IOException {
-    SerializationFactory serializationFactory = new SerializationFactory(
-        jobConf);
+  @Private
+  public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
+      MRSplitProto splitProto, Configuration conf) throws IOException {
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
     return MRHelpers.createNewFormatSplitFromUserPayload(
         splitProto, serializationFactory);
   }