You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/10/09 19:32:56 UTC

[3/3] git commit: TEZ-515. Send MapReduce split information over RPC (to AM and to tasks from the AM). (sseth)

TEZ-515. Send MapReduce split information over RPC (to AM and to tasks
from the AM). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d41471fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d41471fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d41471fc

Branch: refs/heads/master
Commit: d41471fc6d8910a15c86826cf2bf561b2d2b2f3c
Parents: 74d33b6
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Oct 9 10:32:10 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Oct 9 10:32:10 2013 -0700

----------------------------------------------------------------------
 .../mapreduce/examples/FilterLinesByWord.java   |   6 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     |  76 ++++--
 .../mapreduce/examples/OrderedWordCount.java    |   5 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |   5 +-
 tez-mapreduce/pom.xml                           |  31 +++
 .../mapreduce/split/SplitMetaInfoReaderTez.java |   1 +
 .../common/MRInputSplitDistributor.java         |  75 ++++++
 .../tez/mapreduce/hadoop/InputSplitInfo.java    |  70 +++--
 .../mapreduce/hadoop/InputSplitInfoDisk.java    | 102 +++++++
 .../tez/mapreduce/hadoop/InputSplitInfoMem.java |  79 ++++++
 .../apache/tez/mapreduce/hadoop/MRHelpers.java  | 268 +++++++++++++++++--
 .../tez/mapreduce/hadoop/MRJobConfig.java       |   7 +
 .../hadoop/MultiStageMRConfToTezTranslator.java |   3 +
 .../org/apache/tez/mapreduce/input/MRInput.java | 223 ++++++++++-----
 .../src/main/proto/MRRuntimeProtos.proto        |  40 +++
 .../processor/map/TestMapProcessor.java         |   7 +-
 .../processor/reduce/TestReduceProcessor.java   |   7 +-
 .../org/apache/tez/mapreduce/YARNRunner.java    |   3 +-
 18 files changed, 850 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 996e830..be4534b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -64,8 +64,8 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -171,8 +171,8 @@ public class FilterLinesByWord {
     
     // Configure the Input for stage1
     stage1Vertex.addInput("MRInput",
-        new InputDescriptor(MRInputLegacy.class.getName()).setUserPayload(MRHelpers
-            .createUserPayloadFromConf(stage1Conf)));
+        new InputDescriptor(MRInputLegacy.class.getName())
+            .setUserPayload(MRHelpers.createMRInputPayload(stage1Conf, null)), null);
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index c76ec6f..a1fb14a 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -77,8 +77,9 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
@@ -420,7 +421,7 @@ public class MRRSleepJob extends Configured implements Tool {
       int numMapper, int numReducer, int iReduceStagesCount,
       int numIReducer, long mapSleepTime, int mapSleepCount,
       long reduceSleepTime, int reduceSleepCount,
-      long iReduceSleepTime, int iReduceSleepCount)
+      long iReduceSleepTime, int iReduceSleepCount, boolean writeSplitsToDFS)
       throws IOException, YarnException {
 
 
@@ -516,15 +517,26 @@ public class MRRSleepJob extends Configured implements Tool {
       MRHelpers.doJobClientMagic(finalReduceConf);
     }
 
-    InputSplitInfo inputSplitInfo;
-    try {
-      inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
-          remoteStagingDir);
-    } catch (InterruptedException e) {
-      // TODO Auto-generated catch block
-      throw new TezUncheckedException("Could not generate input splits", e);
-    } catch (ClassNotFoundException e) {
-      throw new TezUncheckedException("Failed to generate input splits", e);
+    InputSplitInfo inputSplitInfo = null;
+    if (writeSplitsToDFS) {
+      LOG.info("Writing splits to DFS");
+      try {
+        inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
+            remoteStagingDir);
+      } catch (InterruptedException e) {
+        throw new TezUncheckedException("Could not generate input splits", e);
+      } catch (ClassNotFoundException e) {
+        throw new TezUncheckedException("Failed to generate input splits", e);
+      }
+    } else {
+      try {
+        LOG.info("Creating in-mem splits");
+        inputSplitInfo = MRHelpers.generateInputSplitsToMem(mapStageConf);
+      } catch (ClassNotFoundException e) {
+        throw new TezUncheckedException("Could not generate input splits", e);
+      } catch (InterruptedException e) {
+        throw new TezUncheckedException("Could not generate input splits", e);
+      }
     }
 
     DAG dag = new DAG("MRRSleepJob");
@@ -550,25 +562,40 @@ public class MRRSleepJob extends Configured implements Tool {
 
     List<Vertex> vertices = new ArrayList<Vertex>();
 
+    
+    byte[] mapInputPayload = null;
+    if (writeSplitsToDFS) {
+      mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
+    } else {
+      mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, inputSplitInfo.getSplitsProto());
+    }
+    
     byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(mapUserPayload),
         numMapper, MRHelpers.getMapResource(mapStageConf));
     mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
     mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
-    Map<String, LocalResource> mapLocalResources =
-        new HashMap<String, LocalResource>();
-    mapLocalResources.putAll(commonLocalResources);
-    MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
-        mapLocalResources);
-    mapVertex.setTaskLocalResources(mapLocalResources);
+    
+    if (writeSplitsToDFS) {
+      Map<String, LocalResource> mapLocalResources = new HashMap<String, LocalResource>();
+      mapLocalResources.putAll(commonLocalResources);
+      MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
+          mapLocalResources);
+      mapVertex.setTaskLocalResources(mapLocalResources);
+    } else {
+      mapVertex.setTaskLocalResources(commonLocalResources);
+    }
+
     Map<String, String> mapEnv = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
     mapVertex.setTaskEnvironment(mapEnv);
-    MRHelpers.addMRInput(mapVertex, mapUserPayload);
+    if (writeSplitsToDFS) {
+      MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
+    } else {
+      MRHelpers.addMRInput(mapVertex, mapInputPayload, MRInputSplitDistributor.class);
+    }
     vertices.add(mapVertex);
-    
-    
 
     if (iReduceStagesCount > 0
         && numIReducer > 0) {
@@ -692,7 +719,8 @@ public class MRRSleepJob extends Configured implements Tool {
           " [-irs numIntermediateReducerStages]" +
           " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
           " [-irt intermediateReduceSleepTime]" +
-          " [-recordt recordSleepTime (msec)]");
+          " [-recordt recordSleepTime (msec)]" +
+          " [-writeSplitsToDfs (false)/true]");
       ToolRunner.printGenericCommandUsage(System.err);
       return 2;
     }
@@ -702,6 +730,7 @@ public class MRRSleepJob extends Configured implements Tool {
         iReduceSleepTime=1;
     int mapSleepCount = 1, reduceSleepCount = 1, iReduceSleepCount = 1;
     int iReduceStagesCount = 1;
+    boolean writeSplitsToDfs = false;
 
     for(int i=0; i < args.length; i++ ) {
       if(args[i].equals("-m")) {
@@ -728,6 +757,9 @@ public class MRRSleepJob extends Configured implements Tool {
       else if (args[i].equals("-recordt")) {
         recSleepTime = Long.parseLong(args[++i]);
       }
+      else if (args[i].equals("-writeSplitsToDfs")) {
+        writeSplitsToDfs = Boolean.parseBoolean(args[++i]);
+      }
     }
 
     if (numIReducer > 0 && numReducer <= 0) {
@@ -762,7 +794,7 @@ public class MRRSleepJob extends Configured implements Tool {
     DAG dag = createDAG(remoteFs, conf, appId, remoteStagingDir,
         numMapper, numReducer, iReduceStagesCount, numIReducer,
         mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
-        iReduceSleepTime, iReduceSleepCount);
+        iReduceSleepTime, iReduceSleepCount, writeSplitsToDfs);
 
     conf.set(TezConfiguration.TEZ_AM_JAVA_OPTS,
         MRHelpers.getMRAMJavaOpts(conf));

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 44cf3da..4093b85 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -70,9 +70,9 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
@@ -201,6 +201,7 @@ public class OrderedWordCount {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     byte[] mapPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+    byte[] mapInputPayload = MRHelpers.createMRInputPayload(mapStageConf, null);
     Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(mapPayload),
         inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(mapStageConf));
@@ -215,7 +216,7 @@ public class OrderedWordCount {
     Map<String, String> mapEnv = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
     mapVertex.setTaskEnvironment(mapEnv);
-    MRHelpers.addMRInput(mapVertex, mapPayload);
+    MRHelpers.addMRInput(mapVertex, mapInputPayload, null);
     vertices.add(mapVertex);
 
     Vertex ivertex = new Vertex("ivertex1", new ProcessorDescriptor(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index d9d56bf..2b001b4 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -76,9 +76,9 @@ import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
@@ -399,13 +399,14 @@ public class TestMRRJobsDAGApi {
         remoteStagingDir);
 
     byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Conf, null);
     byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
     
     DAG dag = new DAG("testMRRSleepJobDagSubmit");
     Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
         MapProcessor.class.getName()).setUserPayload(stage1Payload),
         inputSplitInfo.getNumTasks(), Resource.newInstance(256, 1));
-    MRHelpers.addMRInput(stage1Vertex, stage1Payload);
+    MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, null);
     Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
         ReduceProcessor.class.getName()).setUserPayload(
         MRHelpers.createUserPayloadFromConf(stage2Conf)),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce/pom.xml b/tez-mapreduce/pom.xml
index f9755e0..5298195 100644
--- a/tez-mapreduce/pom.xml
+++ b/tez-mapreduce/pom.xml
@@ -91,6 +91,10 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
@@ -99,6 +103,33 @@
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
       </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>MRRuntimeProtos.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index 0705796..6946001 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -48,6 +48,7 @@ public class SplitMetaInfoReaderTez {
   public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION;
   public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER;
 
+
   // Forked from the MR variant so that the metaInfo file as well as the split
   // file can be read from local fs - relying on these files being localized.
   public static TaskSplitMetaInfo[] readSplitMetaInfo(Configuration conf,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
new file mode 100644
index 0000000..c35a450
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.common;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.api.TezRootInputInitializerContext;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+
+import com.google.common.collect.Lists;
+
+public class MRInputSplitDistributor implements TezRootInputInitializer {
+
+  private static final Log LOG = LogFactory
+      .getLog(MRInputSplitDistributor.class);
+
+  public MRInputSplitDistributor() {
+  }
+
+  private MRSplitsProto splitProto;
+
+  @Override
+  public List<Event> initialize(TezRootInputInitializerContext rootInputContext)
+      throws IOException {
+    
+    MRUserPayloadProto userPayloadProto = MRHelpers.parseMRPayload(rootInputContext.getUserPayload());
+
+    this.splitProto = userPayloadProto.getSplits();
+    
+    MRUserPayloadProto.Builder updatedPayloadBuilder = MRUserPayloadProto.newBuilder(userPayloadProto);
+    updatedPayloadBuilder.clearSplits();
+
+    List<Event> events = Lists.newArrayListWithCapacity(this.splitProto.getSplitsCount() + 1);
+    RootInputUpdatePayloadEvent updatePayloadEvent = new RootInputUpdatePayloadEvent(
+        updatedPayloadBuilder.build().toByteArray());
+
+    events.add(updatePayloadEvent);
+    int count = 0;
+    for (MRSplitProto mrSplit : this.splitProto.getSplitsList()) {
+      // Unnecessary array copy, can be avoided by using ByteBuffer instead of a
+      // raw array.
+      RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
+          count++, mrSplit.toByteArray());
+      events.add(diEvent);
+    }
+
+    return events;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
index 3eef766..34ec56f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
@@ -20,66 +20,56 @@ package org.apache.tez.mapreduce.hadoop;
 
 import java.util.List;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+
+// TODO Fix this to be more usable. Interface is broken since half the methods apply to only a specific type.
 
 /**
- * Information obtained by using the InputFormat class to generate
- * the required InputSplit information files that in turn can be used to
- * setup a DAG.
- *
- * The splitsFile and splitsMetaInfoFile need to be provided as LocalResources
- * to the vertex in question. The numTasks represents the parallelism for
- * the vertex and the taskLocationHints define the possible nodes on which the
- * tasks should be run based on the location of the splits that will be
- * processed by each task.
+ * Provides information about input splits.</p>
+ * 
+ * The get*Path methods are only applicable when generating splits to disk. The
+ * getSplitsProto method is only applicable when generating splits to memory.
+ * 
  */
-public class InputSplitInfo {
+@Private
+@Unstable
+public interface InputSplitInfo {
 
-  /// Splits file
-  private final Path splitsFile;
-  /// Meta info file for all the splits information
-  private final Path splitsMetaInfoFile;
-  /// Location hints to determine where to run the tasks
-  private final List<TaskLocationHint> taskLocationHints;
-  /// The num of tasks - same as number of splits generated.
-  private final int numTasks;
-
-  public InputSplitInfo(Path splitsFile, Path splitsMetaInfoFile, int numTasks,
-      List<TaskLocationHint> taskLocationHints) {
-    this.splitsFile = splitsFile;
-    this.splitsMetaInfoFile = splitsMetaInfoFile;
-    this.taskLocationHints = taskLocationHints;
-    this.numTasks = numTasks;
+  public enum Type {
+    DISK, MEM
   }
-
   /**
    * Get the TaskLocationHints for each task
    */
-  public List<TaskLocationHint> getTaskLocationHints() {
-    return taskLocationHints;
-  }
+  public abstract List<TaskLocationHint> getTaskLocationHints();
 
   /**
    * Get the path to the splits meta info file
    */
-  public Path getSplitsMetaInfoFile() {
-    return splitsMetaInfoFile;
-  }
+  public abstract Path getSplitsMetaInfoFile();
 
   /**
    * Get the path to the splits file
    */
-  public Path getSplitsFile() {
-    return splitsFile;
-  }
+  public abstract Path getSplitsFile();
 
   /**
+   * Get the splits proto
+   */
+  public abstract MRSplitsProto getSplitsProto();
+  
+  /**
    * Get the number of splits that were generated. Same as number of tasks that
    * should be run for the vertex processing these set of splits.
    */
-  public int getNumTasks() {
-    return numTasks;
-  }
-
-}
+  public abstract int getNumTasks();
+  
+  /**
+   * Get the {@link Type} of the InputSplitInfo
+   */
+  public abstract Type getType();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
new file mode 100644
index 0000000..6bb3448
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoDisk.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+
+/**
+ * Information obtained by using the InputFormat class to generate
+ * the required InputSplit information files that in turn can be used to
+ * setup a DAG.
+ *
+ * The splitsFile and splitsMetaInfoFile need to be provided as LocalResources
+ * to the vertex in question. The numTasks represents the parallelism for
+ * the vertex and the taskLocationHints define the possible nodes on which the
+ * tasks should be run based on the location of the splits that will be
+ * processed by each task.
+ * 
+ * The getSplitsProto method is not supported by this implementation.
+ */
+public class InputSplitInfoDisk implements InputSplitInfo {
+
+  /// Splits file
+  private Path splitsFile;
+  /// Meta info file for all the splits information
+  private Path splitsMetaInfoFile;
+  /// Location hints to determine where to run the tasks
+  private List<TaskLocationHint> taskLocationHints;
+  /// The num of tasks - same as number of splits generated.
+  private int numTasks;
+
+  public InputSplitInfoDisk(Path splitsFile, Path splitsMetaInfoFile, int numTasks,
+      List<TaskLocationHint> taskLocationHints) {
+    this.splitsFile = splitsFile;
+    this.splitsMetaInfoFile = splitsMetaInfoFile;
+    this.taskLocationHints = taskLocationHints;
+    this.numTasks = numTasks;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.tez.mapreduce.hadoop.MRSplitInfo#getTaskLocationHints()
+   */
+  @Override
+  public List<TaskLocationHint> getTaskLocationHints() {
+    return taskLocationHints;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.tez.mapreduce.hadoop.MRSplitInfo#getSplitsMetaInfoFile()
+   */
+  @Override
+  public Path getSplitsMetaInfoFile() {
+    return splitsMetaInfoFile;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.tez.mapreduce.hadoop.MRSplitInfo#getSplitsFile()
+   */
+  @Override
+  public Path getSplitsFile() {
+    return splitsFile;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.tez.mapreduce.hadoop.MRSplitInfo#getNumTasks()
+   */
+  @Override
+  public int getNumTasks() {
+    return numTasks;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.DISK;
+  }
+
+  @Override
+  public MRSplitsProto getSplitsProto() {
+    throw new UnsupportedOperationException("Not supported for Type: "
+        + getType());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
new file mode 100644
index 0000000..e6be735
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+
+/**
+ * Represents InputSplitInfo for splits generated to memory. </p>
+ * 
+ * Since splits are generated in memory, the getSplitsMetaInfoFile and
+ * getSplitsFile are not supported.
+ * 
+ */
+public class InputSplitInfoMem implements InputSplitInfo {
+
+  private final MRSplitsProto splitsProto;
+  private final List<TaskLocationHint> taskLocationHints;
+  private final int numTasks;
+
+  public InputSplitInfoMem(MRSplitsProto splitsProto,
+      List<TaskLocationHint> taskLocationHints, int numTasks) {
+    this.splitsProto = splitsProto;
+    this.taskLocationHints = taskLocationHints;
+    this.numTasks = numTasks;
+  }
+
+  @Override
+  public List<TaskLocationHint> getTaskLocationHints() {
+    return this.taskLocationHints;
+  }
+
+  @Override
+  public Path getSplitsMetaInfoFile() {
+    throw new UnsupportedOperationException("Not supported for Type: "
+        + getType());
+  }
+
+  @Override
+  public Path getSplitsFile() {
+    throw new UnsupportedOperationException("Not supported for Type: "
+        + getType());
+  }
+
+  @Override
+  public int getNumTasks() {
+    return this.numTasks;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.MEM;
+  }
+
+  @Override
+  public MRSplitsProto getSplitsProto() {
+    return this.splitsProto;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 7c79a76..5271069 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -37,12 +38,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.ContainerLogAppender;
@@ -66,12 +72,21 @@ import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
 
 
 public class MRHelpers {
 
   private static final Log LOG = LogFactory.getLog(MRHelpers.class);
 
+  static final int SPLIT_SERIALIZED_LENGTH_ESTIMATE = 40;
   static final String JOB_SPLIT_RESOURCE_NAME = "job.split";
   static final String JOB_SPLIT_METAINFO_RESOURCE_NAME =
       "job.splitmetainfo";
@@ -126,6 +141,24 @@ public class MRHelpers {
     }
   }
 
+  private static org.apache.hadoop.mapreduce.InputSplit[] generateNewSplits(
+      JobContext jobContext) throws ClassNotFoundException, IOException,
+      InterruptedException {
+    Configuration conf = jobContext.getConfiguration();
+    InputFormat<?, ?> input = ReflectionUtils.newInstance(
+        jobContext.getInputFormatClass(), conf);
+
+    List<org.apache.hadoop.mapreduce.InputSplit> array = input
+        .getSplits(jobContext);
+    org.apache.hadoop.mapreduce.InputSplit[] splits = (org.apache.hadoop.mapreduce.InputSplit[]) array
+        .toArray(new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
+
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(splits, new InputSplitComparator());
+    return splits;
+  }
+
   /**
    * Generate new-api mapreduce InputFormat splits
    * @param jobContext JobContext required by InputFormat
@@ -139,23 +172,13 @@ public class MRHelpers {
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  private static InputSplitInfo writeNewSplits(JobContext jobContext,
+  private static InputSplitInfoDisk writeNewSplits(JobContext jobContext,
       Path inputSplitDir) throws IOException, InterruptedException,
       ClassNotFoundException {
+    
+    org.apache.hadoop.mapreduce.InputSplit[] splits = generateNewSplits(jobContext);
+    
     Configuration conf = jobContext.getConfiguration();
-    InputFormat<?, ?> input =
-        ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf);
-
-    List<org.apache.hadoop.mapreduce.InputSplit> array =
-        input.getSplits(jobContext);
-    org.apache.hadoop.mapreduce.InputSplit[] splits =
-        (org.apache.hadoop.mapreduce.InputSplit[])
-        array.toArray(
-            new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
-
-    // sort the splits into order based on size, so that the biggest
-    // go first
-    Arrays.sort(splits, new InputSplitComparator());
 
     JobSplitWriter.createSplitFiles(inputSplitDir, conf,
         inputSplitDir.getFileSystem(conf), splits);
@@ -168,12 +191,22 @@ public class MRHelpers {
               Arrays.asList(splits[i].getLocations())), null));
     }
 
-    return new InputSplitInfo(
+    return new InputSplitInfoDisk(
         JobSubmissionFiles.getJobSplitFile(inputSplitDir),
         JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
         splits.length, locationHints);
   }
 
+  private static org.apache.hadoop.mapred.InputSplit[] generateOldSplits(
+      JobConf jobConf) throws IOException {
+    org.apache.hadoop.mapred.InputSplit[] splits = jobConf.getInputFormat()
+        .getSplits(jobConf, jobConf.getNumMapTasks());
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(splits, new OldInputSplitComparator());
+    return splits;
+  }
+  
   /**
    * Generate old-api mapred InputFormat splits
    * @param jobConf JobConf required by InputFormat class
@@ -185,13 +218,11 @@ public class MRHelpers {
    *
    * @throws IOException
    */
-  private static InputSplitInfo writeOldSplits(JobConf jobConf,
+  private static InputSplitInfoDisk writeOldSplits(JobConf jobConf,
       Path inputSplitDir) throws IOException {
-    org.apache.hadoop.mapred.InputSplit[] splits =
-        jobConf.getInputFormat().getSplits(jobConf, jobConf.getNumMapTasks());
-    // sort the splits into order based on size, so that the biggest
-    // go first
-    Arrays.sort(splits, new OldInputSplitComparator());
+    
+    org.apache.hadoop.mapred.InputSplit[] splits = generateOldSplits(jobConf);
+    
     JobSplitWriter.createSplitFiles(inputSplitDir, jobConf,
         inputSplitDir.getFileSystem(jobConf), splits);
 
@@ -203,7 +234,7 @@ public class MRHelpers {
               Arrays.asList(splits[i].getLocations())), null));
     }
 
-    return new InputSplitInfo(
+    return new InputSplitInfoDisk(
         JobSubmissionFiles.getJobSplitFile(inputSplitDir),
         JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
         splits.length, locationHints);
@@ -235,11 +266,12 @@ public class MRHelpers {
    * @throws InterruptedException
    * @throws ClassNotFoundException
    */
-  public static InputSplitInfo generateInputSplits(Configuration conf,
+  public static InputSplitInfoDisk generateInputSplits(Configuration conf,
       Path inputSplitsDir) throws IOException, InterruptedException,
       ClassNotFoundException {
     Job job = Job.getInstance(conf);
     JobConf jobConf = new JobConf(conf);
+    conf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
     if (jobConf.getUseNewMapper()) {
       LOG.info("Generating new input splits"
           + ", splitsDir=" + inputSplitsDir.toString());
@@ -251,6 +283,112 @@ public class MRHelpers {
     }
   }
 
+  /**
+   * Generates Input splits and stores them in a {@link MRProtos} instance.
+   * 
+   * Returns an instance of {@link InputSplitInfoMem}
+   * 
+   * @param conf
+   *          an instance of Configuration which is used to determine whether
+   *          the mapred of mapreduce API is being used. This Configuration
+   *          instance should also contain adequate information to be able to
+   *          generate splits - like the InputFormat being used and related
+   *          configuration.
+   * @return an instance of {@link InputSplitInfoMem} which supports a subset of
+   *         the APIs defined on {@link InputSplitInfo}
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   */
+  public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf)
+      throws IOException, ClassNotFoundException, InterruptedException {
+
+    InputSplitInfoMem splitInfoMem = null;
+    JobConf jobConf = new JobConf(conf);
+    if (jobConf.getUseNewMapper()) {
+      LOG.info("Generating mapreduce api input splits");
+      Job job = Job.getInstance(conf);
+      org.apache.hadoop.mapreduce.InputSplit[] splits = generateNewSplits(job);
+      splitInfoMem = createSplitsProto(splits, new SerializationFactory(job.getConfiguration()));
+    } else {
+      LOG.info("Generating mapred api input splits");
+      org.apache.hadoop.mapred.InputSplit[] splits = generateOldSplits(jobConf);
+      splitInfoMem = createSplitsProto(splits);
+    }
+    LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", SerializedSize: "
+        + splitInfoMem.getSplitsProto().getSerializedSize());
+    return splitInfoMem;
+  }
+
+  private static InputSplitInfoMem createSplitsProto(
+      org.apache.hadoop.mapreduce.InputSplit[] newSplits,
+      SerializationFactory serializationFactory) throws IOException,
+      InterruptedException {
+    MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+
+    List<TaskLocationHint> locationHints = Lists
+        .newArrayListWithCapacity(newSplits.length);
+    for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) {
+      splitsBuilder.addSplits(createSplitProto(newSplit, serializationFactory));
+      locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays
+          .asList(newSplit.getLocations())), null));
+    }
+    return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
+        newSplits.length);
+  }
+
+  private static <T extends org.apache.hadoop.mapreduce.InputSplit> MRSplitProto createSplitProto(
+      T newSplit, SerializationFactory serializationFactory)
+      throws IOException, InterruptedException {
+    MRSplitProto.Builder builder = MRSplitProto
+        .newBuilder();
+    
+    builder.setSplitClassName(newSplit.getClass().getName());
+
+    @SuppressWarnings("unchecked")
+    Serializer<T> serializer = serializationFactory
+        .getSerializer((Class<T>) newSplit.getClass());
+    ByteString.Output out = ByteString
+        .newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
+    serializer.open(out);
+    serializer.serialize(newSplit);
+    // TODO MR Compat: Check against max block locations per split.
+    ByteString splitBs = out.toByteString();
+    builder.setSplitBytes(splitBs);
+
+    return builder.build();
+  }
+
+  private static InputSplitInfoMem createSplitsProto(
+      org.apache.hadoop.mapred.InputSplit[] oldSplits) throws IOException {
+    MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
+
+    List<TaskLocationHint> locationHints = Lists
+        .newArrayListWithCapacity(oldSplits.length);
+    for (org.apache.hadoop.mapred.InputSplit oldSplit : oldSplits) {
+      splitsBuilder.addSplits(createSplitProto(oldSplit));
+      locationHints.add(new TaskLocationHint(new HashSet<String>(Arrays
+          .asList(oldSplit.getLocations())), null));
+    }
+    return new InputSplitInfoMem(splitsBuilder.build(), locationHints,
+        oldSplits.length);
+  }
+
+  private static MRSplitProto createSplitProto(
+      org.apache.hadoop.mapred.InputSplit oldSplit) throws IOException {
+    MRSplitProto.Builder builder = MRSplitProto.newBuilder();
+
+    builder.setSplitClassName(oldSplit.getClass().getName());
+    
+    ByteString.Output os = ByteString
+        .newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE);
+    oldSplit.write(new DataOutputStream(os));
+    ByteString splitBs = os.toByteString();
+    builder.setSplitBytes(splitBs);
+
+    return builder.build();
+  }
+
   private static String getChildLogLevel(Configuration conf, boolean isMap) {
     if (isMap) {
       return conf.get(
@@ -460,6 +598,12 @@ public class MRHelpers {
       throws IOException {
     return TezUtils.createUserPayloadFromConf(conf);
   }
+  
+  @LimitedPrivate("Hive, Pig")
+  public static ByteString createByteStringFromConf(Configuration conf)
+      throws IOException {
+    return TezUtils.createByteStringFromConf(conf);
+  }
 
   @LimitedPrivate("Hive, Pig")
   @Unstable
@@ -468,6 +612,29 @@ public class MRHelpers {
     return TezUtils.createConfFromUserPayload(bb);
   }
 
+  @LimitedPrivate("Hive, Pig")
+  public static Configuration createConfFromByteString(ByteString bs)
+      throws IOException {
+    return TezUtils.createConfFromByteString(bs);
+  }
+
+  public static byte[] createMRInputPayload(Configuration conf,
+      MRSplitsProto mrSplitProto) throws IOException {
+    MRUserPayloadProto.Builder userPayloadBuilder = MRUserPayloadProto
+        .newBuilder();
+    userPayloadBuilder.setConfigurationBytes(createByteStringFromConf(conf));
+    if (mrSplitProto != null) {
+      userPayloadBuilder.setSplits(mrSplitProto);
+    }
+    // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer would be
+    // more efficient.
+    return userPayloadBuilder.build().toByteArray();
+  }
+
+  public static MRUserPayloadProto parseMRPayload(byte[] bytes) throws IOException {
+    return MRUserPayloadProto.parseFrom(bytes);
+  }
+
   /**
    * Update provided localResources collection with the required local
    * resources needed by MapReduce tasks with respect to Input splits.
@@ -666,7 +833,7 @@ public class MRHelpers {
     return mrAppMasterAdminOptions.trim()
         + " " + mrAppMasterUserOptions.trim();
   }
-  
+
   /**
    * Convenience method to add an MR Input to the specified vertex. The name of
    * the Input is "MRInput" </p>
@@ -675,11 +842,13 @@ public class MRHelpers {
    * 
    * @param vertex
    * @param userPayload
+   * @param initClazz class to init the input in the AM
    */
-  public static void addMRInput(Vertex vertex, byte[] userPayload) {
+  public static void addMRInput(Vertex vertex, byte[] userPayload,
+      Class<? extends TezRootInputInitializer> initClazz) {
     InputDescriptor id = new InputDescriptor(MRInputLegacy.class.getName())
         .setUserPayload(userPayload);
-    vertex.addInput("MRInput", id);
+    vertex.addInput("MRInput", id, initClazz);
   }
 
   /**
@@ -696,5 +865,52 @@ public class MRHelpers {
         .setUserPayload(userPayload);
     vertex.addOutput("MROutput", od);
   }
+  
+  @SuppressWarnings("unchecked")
+  public static InputSplit createOldFormatSplitFromUserPayload(
+      MRSplitProto splitProto, SerializationFactory serializationFactory)
+      throws IOException {
+    // This may not need to use serialization factory, since OldFormat
+    // always uses Writable to write splits.
+    Preconditions.checkNotNull(splitProto, "splitProto cannot be null");
+    String className = splitProto.getSplitClassName();
+    Class<InputSplit> clazz;
+
+    try {
+      clazz = (Class<InputSplit>) Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
+    }
+
+    Deserializer<InputSplit> deserializer = serializationFactory
+        .getDeserializer(clazz);
+    deserializer.open(splitProto.getSplitBytes().newInput());
+    InputSplit inputSplit = deserializer.deserialize(null);
+    deserializer.close();
+    return inputSplit;
+  }
 
+  @SuppressWarnings("unchecked")
+  public static org.apache.hadoop.mapreduce.InputSplit createNewFormatSplitFromUserPayload(
+      MRSplitProto splitProto, SerializationFactory serializationFactory)
+      throws IOException {
+    Preconditions.checkNotNull(splitProto, "splitProto must be specified");
+    String className = splitProto.getSplitClassName();
+    Class<org.apache.hadoop.mapreduce.InputSplit> clazz;
+
+    try {
+      clazz = (Class<org.apache.hadoop.mapreduce.InputSplit>) Class
+          .forName(className);
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Failed to load InputSplit class: [" + className + "]", e);
+    }
+
+    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = serializationFactory
+        .getDeserializer(clazz);
+    deserializer.open(splitProto.getSplitBytes().newInput());
+    org.apache.hadoop.mapreduce.InputSplit inputSplit = deserializer
+        .deserialize(null);
+    deserializer.close();
+    return inputSplit;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index a42597e..6951261 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -19,11 +19,15 @@ package org.apache.tez.mapreduce.hadoop;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface MRJobConfig {
 
+  @Private
+  static final String MR_TEZ_PREFIX = "mapreduce.tez.";
+  
   // Put all of the attribute names in here so that Job and JobContext are
   // consistent.
   public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.job.inputformat.class";
@@ -650,6 +654,9 @@ public interface MRJobConfig {
       "mrr.vertex.";
 
   public static final String VERTEX_NAME = "mapreduce.task.vertex.name";
+
+  public static final String MR_TEZ_SPLITS_VIA_EVENTS = MR_TEZ_PREFIX + "splits.via.events";
+  public static final boolean MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT = true;
   
   // Stage specific properties
   // Format of each property is mapred.ireducer.stage.<stage-num>.<suffix>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index b07b04b..9f1a2c5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -220,6 +220,9 @@ public class MultiStageMRConfToTezTranslator {
     // Assuming no 0 map jobs, and the first stage is always a map.
     int numStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1);
 
+    // Read split info from HDFS
+    conf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
+    
     // Setup Tez partitioner class
     conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
         MRPartitioner.class.getName());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index ef0de5c..d8f3709 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -19,6 +19,9 @@ package org.apache.tez.mapreduce.input;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,16 +45,20 @@ import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRUserPayloadProto;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 import com.google.common.base.Preconditions;
@@ -68,9 +75,12 @@ public class MRInput implements LogicalInput {
 
   private static final Log LOG = LogFactory.getLog(MRInput.class);
   
-  
+  private final Lock rrLock = new ReentrantLock();
+  Condition rrInited = rrLock.newCondition();
   private TezInputContext inputContext;
   
+  private boolean eventReceived = false;
+  
   private JobConf jobConf;
   private Configuration incrementalConf;
   private boolean recordReaderCreated = false;
@@ -82,31 +92,31 @@ public class MRInput implements LogicalInput {
   @SuppressWarnings("rawtypes")
   private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
   @SuppressWarnings("rawtypes")
-  private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
-  protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+  private volatile org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+  protected volatile org.apache.hadoop.mapreduce.InputSplit newInputSplit;
   
   @SuppressWarnings("rawtypes")
   private InputFormat oldInputFormat;
   @SuppressWarnings("rawtypes")
-  protected RecordReader oldRecordReader;
+  protected volatile RecordReader oldRecordReader;
+  private volatile InputSplit oldInputSplit;
 
   protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   
   private TezCounter inputRecordCounter;
   private TezCounter fileInputByteCounter; 
   private List<Statistics> fsStats;
-
+  
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
-    Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+    MRUserPayloadProto mrUserPayload = MRHelpers.parseMRPayload(inputContext.getUserPayload());
+    Preconditions.checkArgument(mrUserPayload.hasSplits() == false,
+        "All split information not expected in MRInput");
+    Configuration conf = MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
     this.jobConf = new JobConf(conf);
 
-    // Read split information.
-    TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
-    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
-    this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
-        thisTaskMetaInfo.getStartOffset());
+    
     
     // TODO NEWTEZ Rename this to be specific to MRInput. This Input, in
     // theory, can be used by the MapProcessor, ReduceProcessor or a custom
@@ -115,69 +125,146 @@ public class MRInput implements LogicalInput {
     this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
     
     useNewApi = this.jobConf.getUseNewMapper();
-
-    if (useNewApi) {
-      TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
-      Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
-      try {
-        inputFormatClazz = taskAttemptContext.getInputFormatClass();
-      } catch (ClassNotFoundException e) {
-        throw new IOException("Unable to instantiate InputFormat class", e);
+    boolean viaEvents = conf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
+        MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
+    LOG.info("Using New mapreduce API: " + useNewApi
+        + ", split information from events: " + viaEvents);
+
+    if (viaEvents) {
+      if (useNewApi) {
+        setupNewInputFormat();
+      } else {
+        setupOldInputFormat();
       }
 
-      newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+    } else {
+      // Read split information.
+      TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+      TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext
+          .getTaskIndex()];
+      this.splitMetaInfo = new TaskSplitIndex(
+          thisTaskMetaInfo.getSplitLocation(),
+          thisTaskMetaInfo.getStartOffset());
+      if (useNewApi) {
+        setupNewInputFormat();
+        newInputSplit = getNewSplitDetailsFromDisk(splitMetaInfo);
+        setupNewRecordReader();
+      } else {
+        setupOldInputFormat();
+         oldInputSplit = getOldSplitDetailsFromDisk(splitMetaInfo);
+         setupOldRecordReader();
+      }
 
-      newInputSplit = getNewSplitDetails(splitMetaInfo);
+    }
 
-      List<Statistics> matchedStats = null;
-      if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
-        matchedStats = Utils.getFsStatistics(
-            ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
-                newInputSplit).getPath(), this.jobConf);
-      }
-      fsStats = matchedStats;
-      
-      try {
-        newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
-        newRecordReader.initialize(newInputSplit, taskAttemptContext);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while creating record reader", e);
-      }
-    } else { // OLD API
-      oldInputFormat = this.jobConf.getInputFormat();
-      InputSplit oldInputSplit =
-          getOldSplitDetails(splitMetaInfo);
-      
-      
-      List<Statistics> matchedStats = null;
-      if (oldInputSplit instanceof FileSplit) {
-        matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
-      }
-      fsStats = matchedStats;
-      
-      long bytesInPrev = getInputBytes();
-      oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
-          this.jobConf, new MRReporter(inputContext, oldInputSplit));
-      long bytesInCurr = getInputBytes();
-      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-      setIncrementalConfigParams(oldInputSplit);
-    }    
+    LOG.info("Initialzed MRInput: " + inputContext.getSourceVertexName());
     return null;
   }
 
+  
+  
+  private void setupOldInputFormat() {
+    oldInputFormat = this.jobConf.getInputFormat();
+  }
+  
+  private void setupOldRecordReader() throws IOException {
+    Preconditions.checkNotNull(oldInputSplit, "Input split hasn't yet been setup");
+    List<Statistics> matchedStats = null;
+    if (oldInputSplit instanceof FileSplit) {
+      matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
+    }
+    fsStats = matchedStats;
+    
+    long bytesInPrev = getInputBytes();
+    oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
+        this.jobConf, new MRReporter(inputContext, oldInputSplit));
+    long bytesInCurr = getInputBytes();
+    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+    setIncrementalConfigParams(oldInputSplit);
+  }
+  
+  private void setupNewInputFormat() throws IOException {
+    taskAttemptContext = createTaskAttemptContext();
+    Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
+    try {
+      inputFormatClazz = taskAttemptContext.getInputFormatClass();
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Unable to instantiate InputFormat class", e);
+    }
+
+    newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+  }
+  
+  private void setupNewRecordReader() throws IOException {
+    Preconditions.checkNotNull(newInputSplit, "Input split hasn't yet been setup");
+    List<Statistics> matchedStats = null;
+    if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
+      matchedStats = Utils.getFsStatistics(
+          ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
+              newInputSplit).getPath(), this.jobConf);
+    }
+    fsStats = matchedStats;
+    
+    try {
+      newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+      newRecordReader.initialize(newInputSplit, taskAttemptContext);
+    } catch (InterruptedException e) {
+      throw new IOException("Interrupted while creating record reader", e);
+    }
+  }
+
   @Override
   public KeyValueReader getReader() throws IOException {
     Preconditions
         .checkState(recordReaderCreated == false,
             "Only a single instance of record reader can be created for this input.");
     recordReaderCreated = true;
+    if (newRecordReader == null && oldRecordReader == null) {
+      rrLock.lock();
+      try {
+        LOG.info("Awaiting RecordReader initialization");
+        try {
+          rrInited.await();
+        } catch (InterruptedException e) {
+          throw new IOException("Interrupted awaiting RecordReader setup", e);
+        }
+      } finally {
+        rrLock.unlock();
+      }
+    }
+    LOG.info("Creating reader for MRInput: " + inputContext.getSourceVertexName());
     return new MRInputKVReader();
   }
 
-
   @Override
-  public void handleEvents(List<Event> inputEvents) {
-    // Not expecting any events at the moment.
+  public void handleEvents(List<Event> inputEvents) throws Exception {
+    if (eventReceived || inputEvents.size() != 1) {
+      throw new IllegalStateException(
+          "MRInput expects only a single input. Received: current eventListSize: "
+              + inputEvents.size() + "Received previous input: "
+              + eventReceived);
+    }
+    Event event = inputEvents.iterator().next();
+    MRSplitProto splitProto = MRSplitProto
+        .parseFrom(((RootInputDataInformationEvent) event).getUserPayload());
+    if (useNewApi) {
+      newInputSplit = getNewSplitDetailsFromEvent(splitProto);
+      LOG.info("Split Details -> SplitClass: "
+          + newInputSplit.getClass().getName() + ", NewSplit: " + newInputSplit);
+      setupNewRecordReader();
+    } else {
+      oldInputSplit = getOldSplitDetailsFromEvent(splitProto);
+      LOG.info("Split Details -> SplitClass: "
+          + oldInputSplit.getClass().getName() + ", OldSplit: " + oldInputSplit);
+      setupOldRecordReader();
+    }
+    rrLock.lock();
+    try {
+      LOG.info("Notifying on inited to unblock reader");
+      rrInited.signal();
+    } finally {
+      rrLock.unlock();
+    }
   }
 
 
@@ -227,8 +314,16 @@ public class MRInput implements LogicalInput {
     return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
   }
 
+  private InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto)
+      throws IOException {
+    SerializationFactory serializationFactory = new SerializationFactory(
+        jobConf);
+    return MRHelpers.createOldFormatSplitFromUserPayload(splitProto,
+        serializationFactory);
+  }
+  
   @SuppressWarnings("unchecked")
-  private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
+  private InputSplit getOldSplitDetailsFromDisk(TaskSplitIndex splitMetaInfo)
       throws IOException {
     Path file = new Path(splitMetaInfo.getSplitLocation());
     FileSystem fs = FileSystem.getLocal(jobConf);
@@ -263,8 +358,16 @@ public class MRInput implements LogicalInput {
     return split;
   }
 
+  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
+      MRSplitProto splitProto) throws IOException {
+    SerializationFactory serializationFactory = new SerializationFactory(
+        jobConf);
+    return MRHelpers.createNewFormatSplitFromUserPayload(
+        splitProto, serializationFactory);
+  }
+  
   @SuppressWarnings("unchecked")
-  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
+  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromDisk(
       TaskSplitIndex splitMetaInfo) throws IOException {
     Path file = new Path(splitMetaInfo.getSplitLocation());
     long offset = splitMetaInfo.getStartOffset();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto b/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
new file mode 100644
index 0000000..ca9f5b9
--- /dev/null
+++ b/tez-mapreduce/src/main/proto/MRRuntimeProtos.proto
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.mapreduce.protos";
+option java_outer_classname = "MRRuntimeProtos";
+option java_generate_equals_and_hash = true;
+
+// Represents multiple MR splits
+message MRSplitsProto {
+  optional int32 version = 1;
+  repeated MRSplitProto splits = 2;
+  // May be useful to have a single large field to serialize all splits
+  // and use CodedInputStream to read these efficiently (via a Writable/Hadoop serialization wrapper).
+}
+
+// Represents a single MRSplit
+message MRSplitProto {
+  optional string split_class_name = 1;
+  optional bytes split_bytes = 2;
+}
+
+message MRUserPayloadProto {
+  optional bytes configuration_bytes = 1;
+  optional MRSplitsProto splits = 2;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 5b8eedf..2593318 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -35,6 +35,7 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.mapreduce.TestUmbilical;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
@@ -111,6 +112,7 @@ public class TestMapProcessor {
         vertexName);
     
     JobConf job = new JobConf(stageConf);
+    job.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
 
     job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
@@ -120,7 +122,10 @@ public class TestMapProcessor {
     
     MapUtils.generateInputSplit(localFs, workDir, job, mapInput);
     
-    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
+        new InputDescriptor(MRInputLegacy.class.getName())
+            .setUserPayload(MRHelpers.createMRInputPayload(job, null)),
+        0);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
 
     LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, job, 0,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 3910b02..7672983 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -44,6 +44,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
@@ -125,11 +126,15 @@ public class TestReduceProcessor {
     
     mapConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
         "localized-resources").toUri().toString());
+    mapConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
     
     Path mapInput = new Path(workDir, "map0");
     MapUtils.generateInputSplit(localFs, workDir, mapConf, mapInput);
     
-    InputSpec mapInputSpec = new InputSpec("NullSrcVertex", new InputDescriptor(MRInputLegacy.class.getName()), 0);
+    InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
+        new InputDescriptor(MRInputLegacy.class.getName())
+            .setUserPayload(MRHelpers.createMRInputPayload(mapConf, null)),
+        0);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", new OutputDescriptor(LocalOnFileSorterOutput.class.getName()), 1);
     // Run a map
     LogicalIOProcessorRuntimeTask mapTask = MapUtils.createLogicalTask(localFs, workDir, mapConf, 0,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d41471fc/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index ab06e52..63e62ae 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -406,7 +406,8 @@ public class YARNRunner implements ClientProtocol {
         setUserPayload(vertexUserPayload),
         numTasks, taskResource);
     if (isMap) {
-      MRHelpers.addMRInput(vertex, vertexUserPayload);
+      byte[] mapInputPayload = MRHelpers.createMRInputPayload(stageConf, null);
+      MRHelpers.addMRInput(vertex, mapInputPayload, null);
     }
     // Map only jobs.
     if (stageNum == totalStages -1) {