You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/10/10 00:18:09 UTC

git commit: TEZ-537. Add support for session jars and pre-warming of containers to be re-used across a session. (hitesh)

Updated Branches:
  refs/heads/master d41471fc6 -> e2e9247e4


TEZ-537. Add support for session jars and pre-warming of containers to be re-used across a session. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e2e9247e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e2e9247e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e2e9247e

Branch: refs/heads/master
Commit: e2e9247e494259fab7e00252f64fafbb91edaa2c
Parents: d41471f
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Oct 9 15:17:52 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Oct 9 15:17:52 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   |  81 +++++++--
 .../java/org/apache/tez/client/TezSession.java  |  16 +-
 .../tez/client/TezSessionConfiguration.java     |  33 +++-
 .../main/java/org/apache/tez/dag/api/DAG.java   | 180 ++++++++++---------
 .../apache/tez/dag/api/DagTypeConverters.java   |  58 ++++++
 .../apache/tez/dag/api/TezConfiguration.java    |  33 ++++
 tez-api/src/main/proto/DAGApiRecords.proto      |   4 +
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  21 +--
 .../java/org/apache/tez/dag/app/AppContext.java |   4 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 173 +++++++++++++++---
 .../dag/app/TaskAttemptListenerImpTezDag.java   |   6 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   9 +-
 .../mapreduce/examples/OrderedWordCount.java    |  22 +++
 .../library/processor/SleepProcessor.java       | 111 ++++++++++++
 14 files changed, 594 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 93d51d1..87042e9 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
@@ -74,6 +75,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
@@ -204,23 +206,18 @@ public class TezClientUtils {
 
   /**
    * Create an ApplicationSubmissionContext to launch a Tez AM
-   * @param conf
-   * @param appId
-   * @param dag
-   * @param appStagingDir
-   * @param ts
-   * @param amQueueName
-   * @param amName
-   * @param amArgs
-   * @param amEnv
-   * @param amLocalResources
-   * @param appConf
+   * @param conf TezConfiguration
+   * @param appId Application Id
+   * @param dag DAG to be submitted
+   * @param amName Name for the application
+   * @param amConfig AM Configuration
+   * @param tezJarResources Resources to be used by the AM
    * @return
    * @throws IOException
    * @throws YarnException
    */
   static ApplicationSubmissionContext createApplicationSubmissionContext(
-      Configuration conf, ApplicationId appId, DAG dag, String amName,
+      TezConfiguration conf, ApplicationId appId, DAG dag, String amName,
       AMConfiguration amConfig,
       Map<String, LocalResource> tezJarResources)
           throws IOException, YarnException{
@@ -327,7 +324,8 @@ public class TezClientUtils {
     localResources.putAll(tezJarResources);
 
     // emit conf as PB file
-    Configuration finalTezConf = createFinalTezConfForApp(amConfig.getAMConf());
+    Configuration finalTezConf = createFinalTezConfForApp(conf,
+      amConfig.getAMConf());
     Path binaryConfPath =  new Path(amConfig.getStagingDir(),
         TezConfiguration.TEZ_PB_BINARY_CONF_NAME + "." + appId.toString());
     FSDataOutputStream amConfPBOutBinaryStream = null;
@@ -359,8 +357,38 @@ public class TezClientUtils {
     localResources.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
         binaryConfLRsrc);
 
+    // Create Session Jars definition to be sent to AM as a local resource
+    Path sessionJarsPath = new Path(amConfig.getStagingDir(),
+      TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME + "."
+      + appId.toString());
+    FSDataOutputStream sessionJarsPBOutStream = null;
+    try {
+      Map<String, LocalResource> sessionJars =
+        new HashMap<String, LocalResource>(tezJarResources.size() + 1);
+      sessionJars.putAll(tezJarResources);
+      sessionJars.put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
+        binaryConfLRsrc);
+      DAGProtos.PlanLocalResourcesProto proto =
+        DagTypeConverters.convertFromLocalResources(sessionJars);
+      sessionJarsPBOutStream = FileSystem.create(fs, sessionJarsPath,
+        new FsPermission(TEZ_AM_FILE_PERMISSION));
+      proto.writeTo(sessionJarsPBOutStream);
+    } finally {
+      if (sessionJarsPBOutStream != null) {
+        sessionJarsPBOutStream.close();
+      }
+    }
+
+    LocalResource sessionJarsPBLRsrc =
+      TezClientUtils.createLocalResource(fs,
+        sessionJarsPath, LocalResourceType.FILE,
+        LocalResourceVisibility.APPLICATION);
+    localResources.put(
+      TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME,
+      sessionJarsPBLRsrc);
+
     if(dag != null) {
-      // Add tez jars to vertices too
+
       for (Vertex v : dag.getVertices()) {
         v.getTaskLocalResources().putAll(tezJarResources);
         v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
@@ -440,19 +468,36 @@ public class TezClientUtils {
         + "," + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
   }
 
-  static Configuration createFinalTezConfForApp(TezConfiguration amConf) {
+  static Configuration createFinalTezConfForApp(TezConfiguration tezConf,
+      TezConfiguration amConf) {
     Configuration conf = new Configuration(false);
     conf.setQuietMode(true);
 
+    assert tezConf != null;
     assert amConf != null;
-    Iterator<Entry<String, String>> iter = amConf.iterator();
+
+    Entry<String, String> entry;
+    Iterator<Entry<String, String>> iter = tezConf.iterator();
+    while (iter.hasNext()) {
+      entry = iter.next();
+      // Copy all tez config parameters.
+      if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
+        conf.set(entry.getKey(), entry.getValue());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Adding tez dag am parameter from conf: " + entry.getKey()
+            + ", with value: " + entry.getValue());
+        }
+      }
+    }
+
+    iter = amConf.iterator();
     while (iter.hasNext()) {
-      Entry<String, String> entry = iter.next();
+      entry = iter.next();
       // Copy all tez config parameters.
       if (entry.getKey().startsWith(TezConfiguration.TEZ_PREFIX)) {
         conf.set(entry.getKey(), entry.getValue());
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Adding tez dag am parameter: " + entry.getKey()
+          LOG.debug("Adding tez dag am parameter from amConf: " + entry.getKey()
               + ", with value: " + entry.getValue());
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/client/TezSession.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSession.java b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
index 45408e1..95ee1b5 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSession.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSession.java
@@ -89,6 +89,11 @@ public class TezSession {
     tezJarResources = TezClientUtils.setupTezJarsLocalResources(
         sessionConfig.getTezConfiguration());
 
+    if (sessionConfig.getSessionResources() != null
+      && !sessionConfig.getSessionResources().isEmpty()) {
+      tezJarResources.putAll(sessionConfig.getSessionResources());
+    }
+
     try {
       if (applicationId == null) {
         applicationId = yarnClient.createApplication().
@@ -134,14 +139,7 @@ public class TezSession {
     LOG.info("Submitting dag to TezSession"
         + ", sessionName=" + sessionName
         + ", applicationId=" + applicationId);
-    // Add tez jars to vertices too
-    for (Vertex v : dag.getVertices()) {
-      v.getTaskLocalResources().putAll(tezJarResources);
-      if (null != tezConfPBLRsrc) {
-        v.getTaskLocalResources().put(TezConfiguration.TEZ_PB_BINARY_CONF_NAME,
-            tezConfPBLRsrc);
-      }
-    }
+
     DAGPlan dagPlan = dag.createDag(sessionConfig.getTezConfiguration());
     SubmitDAGRequestProto requestProto =
         SubmitDAGRequestProto.newBuilder().setDAGPlan(dagPlan).build();
@@ -153,7 +151,6 @@ public class TezSession {
         TezConfiguration.TEZ_SESSION_CLIENT_TIMEOUT_SECS_DEFAULT);
     long endTime = startTime + (timeout * 1000);
     while (true) {
-      // FIXME implement a max time to wait for submit
       proxy = TezClientUtils.getSessionAMProxy(yarnClient,
           sessionConfig.getYarnConfiguration(), applicationId);
       if (proxy != null) {
@@ -269,4 +266,5 @@ public class TezSession {
     }
     return TezSessionStatus.INITIALIZING;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
index 61ca60b..2ac5b6c 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezSessionConfiguration.java
@@ -18,28 +18,49 @@
 
 package org.apache.tez.client;
 
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.dag.api.TezConfiguration;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+
 public class TezSessionConfiguration {
 
   private final AMConfiguration amConfiguration;
   private final YarnConfiguration yarnConfig;
   private final TezConfiguration tezConfig;
+  private final Map<String, LocalResource> sessionResources;
 
   public TezSessionConfiguration(AMConfiguration amConfiguration,
       TezConfiguration tezConfig) {
-    this.amConfiguration = amConfiguration;
-    this.tezConfig = tezConfig;
-    this.yarnConfig = new YarnConfiguration(tezConfig);
+    this(amConfiguration, tezConfig, new YarnConfiguration(tezConfig));
+  }
+
+  TezSessionConfiguration(AMConfiguration amConfiguration,
+                          TezConfiguration tezConfig,
+                          YarnConfiguration yarnConf) {
+    this(amConfiguration, tezConfig, yarnConf,
+      new TreeMap<String, LocalResource>());
   }
 
+  /**
+   * TezSessionConfiguration constructor
+   * @param amConfiguration AM Configuration @see AMConfiguration
+   * @param tezConfig Tez Configuration
+   * @param yarnConf Yarn Configuration
+   * @param sessionResources LocalResources accessible to all tasks that are
+   *                         launched within this session.
+   */
   TezSessionConfiguration(AMConfiguration amConfiguration,
       TezConfiguration tezConfig,
-      YarnConfiguration yarnConf) {
+      YarnConfiguration yarnConf,
+      Map<String, LocalResource> sessionResources) {
     this.amConfiguration = amConfiguration;
     this.tezConfig = tezConfig;
     this.yarnConfig = yarnConf;
+    this.sessionResources = sessionResources;
   }
 
   public AMConfiguration getAMConfiguration() {
@@ -54,4 +75,8 @@ public class TezSessionConfiguration {
     return tezConfig;
   }
 
+  public Map<String, LocalResource> getSessionResources() {
+    return Collections.unmodifiableMap(sessionResources);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 98ea91b..7ae7f16 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -64,12 +64,12 @@ public class DAG { // FIXME rename to Topology
   public synchronized DAG addVertex(Vertex vertex) {
     if (vertices.containsKey(vertex.getVertexName())) {
       throw new IllegalStateException(
-          "Vertex " + vertex.getVertexName() + " already defined!");
+        "Vertex " + vertex.getVertexName() + " already defined!");
     }
     vertices.put(vertex.getVertexName(), vertex);
     return this;
   }
-  
+
   public synchronized Vertex getVertex(String vertexName) {
     return vertices.get(vertexName);
   }
@@ -83,15 +83,15 @@ public class DAG { // FIXME rename to Topology
     // Sanity checks
     if (!vertices.containsValue(edge.getInputVertex())) {
       throw new IllegalArgumentException(
-          "Input vertex " + edge.getInputVertex() + " doesn't exist!");
+        "Input vertex " + edge.getInputVertex() + " doesn't exist!");
     }
     if (!vertices.containsValue(edge.getOutputVertex())) {
       throw new IllegalArgumentException(
-          "Output vertex " + edge.getOutputVertex() + " doesn't exist!");
+        "Output vertex " + edge.getOutputVertex() + " doesn't exist!");
     }
     if (edges.contains(edge)) {
       throw new IllegalArgumentException(
-          "Edge " + edge + " already defined!");
+        "Edge " + edge + " already defined!");
     }
 
     // Inform the vertices
@@ -101,7 +101,7 @@ public class DAG { // FIXME rename to Topology
     edges.add(edge);
     return this;
   }
-  
+
   public String getName() {
     return this.name;
   }
@@ -116,11 +116,11 @@ public class DAG { // FIXME rename to Topology
 
     int outDegree;
 
-    private AnnotatedVertex(Vertex v){
-       this.v = v;
-       index = -1;
-       lowlink = -1;
-       outDegree = 0;
+    private AnnotatedVertex(Vertex v) {
+      this.v = v;
+      index = -1;
+      lowlink = -1;
+      outDegree = 0;
     }
   }
 
@@ -147,16 +147,16 @@ public class DAG { // FIXME rename to Topology
     verify(true);
   }
 
-  public void verify(boolean restricted) throws IllegalStateException  {
+  public void verify(boolean restricted) throws IllegalStateException {
     if (vertices.isEmpty()) {
       throw new IllegalStateException("Invalid dag containing 0 vertices");
     }
 
     Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
-    for(Edge e : edges){
+    for (Edge e : edges) {
       Vertex inputVertex = e.getInputVertex();
       List<Edge> edgeList = edgeMap.get(inputVertex);
-      if(edgeList == null){
+      if (edgeList == null) {
         edgeList = new ArrayList<Edge>();
         edgeMap.put(inputVertex, edgeList);
       }
@@ -166,10 +166,10 @@ public class DAG { // FIXME rename to Topology
     // check for valid vertices, duplicate vertex names,
     // and prepare for cycle detection
     Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
-    for(Vertex v : vertices.values()){
-      if(vertexMap.containsKey(v.getVertexName())){
-         throw new IllegalStateException("DAG contains multiple vertices"
-             + " with name: " + v.getVertexName());
+    for (Vertex v : vertices.values()) {
+      if (vertexMap.containsKey(v.getVertexName())) {
+        throw new IllegalStateException("DAG contains multiple vertices"
+          + " with name: " + v.getVertexName());
       }
       vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
     }
@@ -180,12 +180,12 @@ public class DAG { // FIXME rename to Topology
       for (RootInputLeafOutput<InputDescriptor> in : v.getInputs()) {
         if (vertexMap.containsKey(in.getName())) {
           throw new IllegalStateException(
-              "DAG contains a vertex and an Input to a vertex with the same name: "
-                  + in.getName());
+            "DAG contains a vertex and an Input to a vertex with the same name: "
+              + in.getName());
         }
         if (namedIOs.contains(in.getName())) {
           throw new IllegalStateException(
-              "DAG contains an Input or an Output with a repeated name: " + in.getName());
+            "DAG contains an Input or an Output with a repeated name: " + in.getName());
         } else {
           namedIOs.add(in.getName());
         }
@@ -193,38 +193,38 @@ public class DAG { // FIXME rename to Topology
       for (RootInputLeafOutput<OutputDescriptor> out : v.getOutputs()) {
         if (vertexMap.containsKey(out.getName())) {
           throw new IllegalStateException(
-              "DAG contains a vertex and an Output from a vertex with the same name: "
-                  + out.getName());
+            "DAG contains a vertex and an Output from a vertex with the same name: "
+              + out.getName());
         }
         if (namedIOs.contains(out.getName())) {
           throw new IllegalStateException(
-              "DAG contains an Input or an Output with a repeated name: " + out.getName());
+            "DAG contains an Input or an Output with a repeated name: " + out.getName());
         } else {
           namedIOs.add(out.getName());
         }
       }
     }
-    
+
     detectCycles(edgeMap, vertexMap);
 
-    if(restricted){
-      for(Edge e : edges){
+    if (restricted) {
+      for (Edge e : edges) {
         vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
         if (e.getEdgeProperty().getDataSourceType() !=
-            DataSourceType.PERSISTED) {
+          DataSourceType.PERSISTED) {
           throw new IllegalStateException(
-              "Unsupported source type on edge. " + e);
+            "Unsupported source type on edge. " + e);
         }
         if (e.getEdgeProperty().getSchedulingType() !=
-            SchedulingType.SEQUENTIAL) {
+          SchedulingType.SEQUENTIAL) {
           throw new IllegalStateException(
-              "Unsupported scheduling type on edge. " + e);
+            "Unsupported scheduling type on edge. " + e);
         }
       }
-      for(AnnotatedVertex av: vertexMap.values()){
+      for (AnnotatedVertex av : vertexMap.values()) {
         if (av.outDegree > 1) {
           throw new IllegalStateException("Vertex has outDegree>1: "
-              + av.v.getVertexName());
+            + av.v.getVertexName());
         }
       }
     }
@@ -233,11 +233,11 @@ public class DAG { // FIXME rename to Topology
   // Adaptation of Tarjan's algorithm for connected components.
   // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
   private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap)
-      throws IllegalStateException{
+    throws IllegalStateException {
     Integer nextIndex = 0; // boxed integer so it is passed by reference.
     Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
-    for(AnnotatedVertex av: vertexMap.values()){
-      if(av.index == -1){
+    for (AnnotatedVertex av : vertexMap.values()) {
+      if (av.index == -1) {
         assert stack.empty();
         strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
       }
@@ -247,10 +247,10 @@ public class DAG { // FIXME rename to Topology
   // part of Tarjan's algorithm for connected components.
   // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
   private void strongConnect(
-          AnnotatedVertex av,
-          Map<String, AnnotatedVertex> vertexMap,
-          Map<Vertex, List<Edge>> edgeMap,
-          Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException{
+    AnnotatedVertex av,
+    Map<String, AnnotatedVertex> vertexMap,
+    Map<Vertex, List<Edge>> edgeMap,
+    Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException {
     av.index = nextIndex;
     av.lowlink = nextIndex;
     nextIndex++;
@@ -258,14 +258,13 @@ public class DAG { // FIXME rename to Topology
     av.onstack = true;
 
     List<Edge> edges = edgeMap.get(av.v);
-    if(edges != null){
-      for(Edge e : edgeMap.get(av.v)){
+    if (edges != null) {
+      for (Edge e : edgeMap.get(av.v)) {
         AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName());
-        if(outVertex.index == -1){
+        if (outVertex.index == -1) {
           strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
           av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
-        }
-        else if(outVertex.onstack){
+        } else if (outVertex.onstack) {
           // strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
           // update lowlink in case outputVertex should be considered the root of this component.
           av.lowlink = Math.min(av.lowlink, outVertex.index);
@@ -273,21 +272,21 @@ public class DAG { // FIXME rename to Topology
       }
     }
 
-    if(av.lowlink == av.index ){
-       AnnotatedVertex pop = stack.pop();
-       pop.onstack = false;
-       if(pop != av){
-         // there was something on the stack other than this "av".
-         // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
-         StringBuilder message = new StringBuilder();
-         message.append(av.v.getVertexName() + " <- ");
-         for( ; pop != av; pop = stack.pop()){
-           message.append(pop.v.getVertexName() + " <- ");
-           pop.onstack = false;
-         }
-         message.append(av.v.getVertexName());
-         throw new IllegalStateException("DAG contains a cycle: " + message);
-       }
+    if (av.lowlink == av.index) {
+      AnnotatedVertex pop = stack.pop();
+      pop.onstack = false;
+      if (pop != av) {
+        // there was something on the stack other than this "av".
+        // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
+        StringBuilder message = new StringBuilder();
+        message.append(av.v.getVertexName() + " <- ");
+        for (; pop != av; pop = stack.pop()) {
+          message.append(pop.v.getVertexName() + " <- ");
+          pop.onstack = false;
+        }
+        message.append(av.v.getVertexName());
+        throw new IllegalStateException("DAG contains a cycle: " + message);
+      }
     }
   }
 
@@ -301,12 +300,12 @@ public class DAG { // FIXME rename to Topology
 
     dagBuilder.setName(this.name);
 
-    for(Vertex vertex : vertices.values()){
+    for (Vertex vertex : vertices.values()) {
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
       vertexBuilder.setName(vertex.getVertexName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
       vertexBuilder.setProcessorDescriptor(DagTypeConverters
-          .convertToDAGPlan(vertex.getProcessorDescriptor()));
+        .convertToDAGPlan(vertex.getProcessorDescriptor()));
       if (vertex.getInputs().size() > 0) {
         for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
           vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
@@ -328,31 +327,34 @@ public class DAG { // FIXME rename to Topology
 
       taskConfigBuilder.setTaskModule(vertex.getVertexName());
       PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
-      Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
-      for(Entry<String, LocalResource> entry : lrs.entrySet()){
-        String key = entry.getKey();
-        LocalResource lr = entry.getValue();
-        localResourcesBuilder.setName(key);
-        localResourcesBuilder.setUri(
+      if (vertex.getTaskLocalResources() != null) {
+        localResourcesBuilder.clear();
+        for (Entry<String, LocalResource> entry :
+          vertex.getTaskLocalResources().entrySet()) {
+          String key = entry.getKey();
+          LocalResource lr = entry.getValue();
+          localResourcesBuilder.setName(key);
+          localResourcesBuilder.setUri(
             DagTypeConverters.convertToDAGPlan(lr.getResource()));
-        localResourcesBuilder.setSize(lr.getSize());
-        localResourcesBuilder.setTimeStamp(lr.getTimestamp());
-        localResourcesBuilder.setType(
+          localResourcesBuilder.setSize(lr.getSize());
+          localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+          localResourcesBuilder.setType(
             DagTypeConverters.convertToDAGPlan(lr.getType()));
-        localResourcesBuilder.setVisibility(
+          localResourcesBuilder.setVisibility(
             DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
-        if(lr.getType() == LocalResourceType.PATTERN){
-          if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
-            throw new TezUncheckedException("LocalResource type set to pattern"
+          if (lr.getType() == LocalResourceType.PATTERN) {
+            if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+              throw new TezUncheckedException("LocalResource type set to pattern"
                 + " but pattern is null or empty");
+            }
+            localResourcesBuilder.setPattern(lr.getPattern());
           }
-          localResourcesBuilder.setPattern(lr.getPattern());
+          taskConfigBuilder.addLocalResource(localResourcesBuilder);
         }
-        taskConfigBuilder.addLocalResource(localResourcesBuilder);
       }
 
-      if(vertex.getTaskEnvironment() != null){
-        for(String key : vertex.getTaskEnvironment().keySet()){
+      if (vertex.getTaskEnvironment() != null) {
+        for (String key : vertex.getTaskEnvironment().keySet()) {
           PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
           envSettingBuilder.setKey(key);
           envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
@@ -360,15 +362,15 @@ public class DAG { // FIXME rename to Topology
         }
       }
 
-      if(vertex.getTaskLocationsHint() != null ){
-        if(vertex.getTaskLocationsHint().getTaskLocationHints() != null){
-          for(TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()){
+      if (vertex.getTaskLocationsHint() != null) {
+        if (vertex.getTaskLocationsHint().getTaskLocationHints() != null) {
+          for (TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()) {
             PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
 
-            if(hint.getDataLocalHosts() != null){
+            if (hint.getDataLocalHosts() != null) {
               taskLocationHintBuilder.addAllHost(hint.getDataLocalHosts());
             }
-            if(hint.getRacks() != null){
+            if (hint.getRacks() != null) {
               taskLocationHintBuilder.addAllRack(hint.getRacks());
             }
 
@@ -377,11 +379,11 @@ public class DAG { // FIXME rename to Topology
         }
       }
 
-      for(String inEdgeId : vertex.getInputEdgeIds()){
+      for (String inEdgeId : vertex.getInputEdgeIds()) {
         vertexBuilder.addInEdgeId(inEdgeId);
       }
 
-      for(String outEdgeId : vertex.getOutputEdgeIds()){
+      for (String outEdgeId : vertex.getOutputEdgeIds()) {
         vertexBuilder.addOutEdgeId(outEdgeId);
       }
 
@@ -389,7 +391,7 @@ public class DAG { // FIXME rename to Topology
       dagBuilder.addVertex(vertexBuilder);
     }
 
-    for(Edge edge : edges){
+    for (Edge edge : edges) {
       EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
       edgeBuilder.setId(edge.getId());
       edgeBuilder.setInputVertexName(edge.getInputVertex().getVertexName());
@@ -402,10 +404,10 @@ public class DAG { // FIXME rename to Topology
       dagBuilder.addEdge(edgeBuilder);
     }
 
-    if(dagConf != null) {
+    if (dagConf != null) {
       Iterator<Entry<String, String>> iter = dagConf.iterator();
       ConfigurationProto.Builder confProtoBuilder =
-          ConfigurationProto.newBuilder();
+        ConfigurationProto.newBuilder();
       while (iter.hasNext()) {
         Entry<String, String> entry = iter.next();
         PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 3247935..803c943 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -44,6 +44,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
@@ -322,4 +323,61 @@ public class DagTypeConverters {
         + " proto");
   }
 
+
+  public static PlanLocalResourcesProto convertFromLocalResources(
+    Map<String, LocalResource> localResources) {
+    PlanLocalResourcesProto.Builder builder =
+      PlanLocalResourcesProto.newBuilder();
+    for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) {
+      PlanLocalResource plr = convertLocalResourceToPlanLocalResource(
+        entry.getKey(), entry.getValue());
+      builder.addLocalResources(plr);
+    }
+    return builder.build();
+  }
+
+  public static Map<String, LocalResource> convertFromPlanLocalResources(
+    PlanLocalResourcesProto proto) {
+    Map<String, LocalResource> localResources =
+      new HashMap<String, LocalResource>(proto.getLocalResourcesCount());
+    for (PlanLocalResource plr : proto.getLocalResourcesList()) {
+      String name = plr.getName();
+      LocalResource lr = convertPlanLocalResourceToLocalResource(plr);
+      localResources.put(name, lr);
+    }
+    return localResources;
+  }
+
+  public static PlanLocalResource convertLocalResourceToPlanLocalResource(
+    String name, LocalResource lr) {
+    PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
+    localResourcesBuilder.setName(name);
+    localResourcesBuilder.setUri(
+      DagTypeConverters.convertToDAGPlan(lr.getResource()));
+    localResourcesBuilder.setSize(lr.getSize());
+    localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+    localResourcesBuilder.setType(
+      DagTypeConverters.convertToDAGPlan(lr.getType()));
+    localResourcesBuilder.setVisibility(
+      DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+    if (lr.getType() == LocalResourceType.PATTERN) {
+      if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+        throw new TezUncheckedException("LocalResource type set to pattern"
+          + " but pattern is null or empty");
+      }
+      localResourcesBuilder.setPattern(lr.getPattern());
+    }
+    return localResourcesBuilder.build();
+  }
+
+  public static LocalResource convertPlanLocalResourceToLocalResource(
+      PlanLocalResource plr) {
+    return LocalResource.newInstance(
+      ConverterUtils.getYarnUrlFromPath(new Path(plr.getUri())),
+      DagTypeConverters.convertFromDAGPlan(plr.getType()),
+      DagTypeConverters.convertFromDAGPlan(plr.getVisibility()),
+      plr.getSize(), plr.getTimeStamp(),
+      plr.hasPattern() ? plr.getPattern() : null);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index e807636..4efa6e2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -227,6 +227,9 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_SESSION_PREFIX =
       TEZ_PREFIX + "session.";
 
+  public static final String TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME =
+    TEZ_SESSION_PREFIX + "local-resources.pb.file-name";
+
   /**
    * Time (in seconds) to wait for AM to come up when trying to submit a DAG
    * from the client.
@@ -245,4 +248,34 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT =
       300;
 
+  /**
+   * Session pre-warm related configuration options
+   */
+
+  public static final String TEZ_SESSION_PRE_WARM_PREFIX =
+    TEZ_SESSION_PREFIX + "pre-warm.";
+  public static final String TEZ_SESSION_PRE_WARM_ENABLED =
+    TEZ_SESSION_PRE_WARM_PREFIX + "enabled";
+  public static final boolean TEZ_SESSION_PRE_WARM_ENABLED_DEFAULT = false;
+
+  public static final String TEZ_SESSION_PRE_WARM_NUM_CONTAINERS =
+    TEZ_SESSION_PRE_WARM_PREFIX + "num-containers";
+  public static final String TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB =
+    TEZ_SESSION_PRE_WARM_PREFIX + "container.resource.memory.mb";
+  public static final int
+    TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB_DEFAULT = 1024;
+
+  public static final String TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES =
+    TEZ_SESSION_PRE_WARM_PREFIX + "container.resource.vcores";
+  public static final int
+    TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES_DEFAULT = 1;
+
+  public static final String TEZ_SESSION_PRE_WARM_CONTAINER_JAVA_OPTS =
+    TEZ_SESSION_PRE_WARM_PREFIX + "container.java.opts";
+  public static final String TEZ_SESSION_PRE_WARM_CONTAINER_ENVIRONMENT =
+    TEZ_SESSION_PRE_WARM_PREFIX + "container.environment";
+
+  public static final String TEZ_SESSION_PRE_WARM_PROCESSOR_NAME =
+    TEZ_SESSION_PRE_WARM_PREFIX + "processor";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 79e62ca..b948e60 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -189,3 +189,7 @@ message DAGStatusProto {
   optional ProgressProto DAGProgress = 3;
   repeated StringProgressPairProto vertexProgress = 4;  
 }
+
+message PlanLocalResourcesProto {
+  repeated PlanLocalResource localResources = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index e87bbb0..5b4793b 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -164,9 +164,11 @@ public class YarnTezDagChild {
     int eventCounter = 0;
     int eventsRange = 0;
     TezTaskAttemptID taskAttemptID = null;
+    List<TezEvent> events = new ArrayList<TezEvent>();
     try {
       taskLock.readLock().lock();
       if (currentTask != null) {
+        eventsToSend.drainTo(events);
         taskAttemptID = currentTaskAttemptID;
         eventCounter = currentTask.getEventCounter();
         eventsRange = maxEventsToGet;
@@ -175,24 +177,21 @@ public class YarnTezDagChild {
               currentTask.getCounters(), currentTask.getProgress()),
                 new EventMetaData(EventProducerConsumerType.SYSTEM,
                     currentTask.getVertexName(), "", taskAttemptID));
-        } else if (outOfBandEvents == null) {
+          events.add(updateEvent);
+        } else if (outOfBandEvents == null && events.isEmpty()) {
           LOG.info("Setting TaskAttemptID to null as the task has already"
             + " completed. Caused by race-condition between the normal"
             + " heartbeat and out-of-band heartbeats");
           taskAttemptID = null;
+        } else {
+          if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
+            events.addAll(outOfBandEvents);
+          }
         }
       }
     } finally {
       taskLock.readLock().unlock();
     }
-    List<TezEvent> events = new ArrayList<TezEvent>();
-    if (updateEvent != null) {
-      events.add(updateEvent);
-    }
-    eventsToSend.drainTo(events);
-    if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
-      events.addAll(outOfBandEvents);
-    }
 
     long reqId = requestCounter.incrementAndGet();
     TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
@@ -244,9 +243,7 @@ public class YarnTezDagChild {
   public static void main(String[] args) throws Throwable {
     Thread.setDefaultUncaughtExceptionHandler(
         new YarnUncaughtExceptionHandler());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Child starting");
-    }
+    LOG.info("YarnTezDagChild starting");
 
     final Configuration defaultConf = new Configuration();
     TezUtils.addUserSpecifiedTezConfiguration(defaultConf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 26c0992..1f3619e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.app.dag.DAG;
@@ -74,4 +75,7 @@ public interface AppContext {
   AMNodeMap getAllNodes();
 
   TaskSchedulerEventHandler getTaskScheduler();
+
+  Map<String, LocalResource> getSessionResources();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8d6fd1f..ced6ca2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -26,6 +26,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -63,25 +64,32 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -116,6 +124,7 @@ import org.apache.tez.dag.history.avro.HistoryEventType;
 import org.apache.tez.dag.history.events.AMStartedEvent;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
 
 /**
  * The Map-Reduce Application Master.
@@ -172,6 +181,8 @@ public class DAGAppMaster extends AbstractService {
   private VertexEventDispatcher vertexEventDispatcher;
   private TaskSchedulerEventHandler taskSchedulerEventHandler;
   private HistoryEventHandler historyEventHandler;
+  private final Map<String, LocalResource> sessionResources =
+    new HashMap<String, LocalResource>();
 
   private DAGAppMasterShutdownHandler shutdownHandler =
       new DAGAppMasterShutdownHandler();
@@ -230,9 +241,7 @@ public class DAGAppMaster extends AbstractService {
   }
 
   @Override
-  public void serviceInit(final Configuration conf) throws Exception {
-
-    this.state = DAGAppMasterState.INITED;
+  public synchronized void serviceInit(final Configuration conf) throws Exception {
 
     this.amConf = conf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
@@ -304,8 +313,27 @@ public class DAGAppMaster extends AbstractService {
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS,
             TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS_DEFAULT);
 
+    if (isSession) {
+      FileInputStream sessionResourcesStream = null;
+      try {
+        sessionResourcesStream = new FileInputStream(
+          TezConfiguration.TEZ_SESSION_LOCAL_RESOURCES_PB_FILE_NAME);
+        PlanLocalResourcesProto localResourcesProto =
+          PlanLocalResourcesProto.parseFrom(sessionResourcesStream);
+        sessionResources.putAll(DagTypeConverters.convertFromPlanLocalResources(
+          localResourcesProto));
+      } finally {
+        if (sessionResourcesStream != null) {
+          sessionResourcesStream.close();
+        }
+      }
+    }
+
     initServices(conf);
     super.serviceInit(conf);
+
+    this.state = DAGAppMasterState.INITED;
+
   }
 
   protected Dispatcher createDispatcher() {
@@ -863,6 +891,11 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public Map<String, LocalResource> getSessionResources() {
+      return sessionResources;
+    }
+
+    @Override
     public Map<ApplicationAccessType, String> getApplicationACLs() {
       if (getServiceState() != STATE.STARTED) {
         throw new TezUncheckedException(
@@ -1066,14 +1099,12 @@ public class DAGAppMaster extends AbstractService {
 
   @SuppressWarnings("unchecked")
   @Override
-  public void serviceStart() throws Exception {
+  public synchronized void serviceStart() throws Exception {
 
     //start all the components
     startServices();
     super.serviceStart();
 
-    this.state = DAGAppMasterState.IDLE;
-
     // metrics system init is really init & start.
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("DAGAppMaster");
@@ -1089,19 +1120,124 @@ public class DAGAppMaster extends AbstractService {
     if (!isSession) {
       startDAG();
     } else {
-      LOG.info("In Session mode. Waiting for DAG over RPC");
-      this.dagSubmissionTimer = new Timer(true);
-      this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
-        @Override
-        public void run() {
-          checkAndHandleSessionTimeout();
-        }
-      }, sessionTimeoutInterval, sessionTimeoutInterval/10);
+      boolean preWarmContainersEnabled = amConf.getBoolean(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_ENABLED,
+        TezConfiguration.TEZ_SESSION_PRE_WARM_ENABLED_DEFAULT);
+
+      boolean ranPreWarmContainersDAG = false;
+      if (preWarmContainersEnabled) {
+        ranPreWarmContainersDAG = runPreWarmContainersDAG();
+      }
+
+      if (!ranPreWarmContainersDAG) {
+        LOG.info("In Session mode. Waiting for DAG over RPC");
+        this.state = DAGAppMasterState.IDLE;
+
+        this.dagSubmissionTimer = new Timer(true);
+        this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
+          @Override
+          public void run() {
+            checkAndHandleSessionTimeout();
+          }
+        }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
+      }
     }
   }
 
+  private boolean runPreWarmContainersDAG() {
+    int numContainers = amConf.getInt(
+      TezConfiguration.TEZ_SESSION_PRE_WARM_NUM_CONTAINERS, 0);
+    if (numContainers == 0) {
+      LOG.info("Not pre-warming containers as "
+        + TezConfiguration.TEZ_SESSION_PRE_WARM_NUM_CONTAINERS
+        + " not specified or set to 0");
+      return false;
+    }
+    if ((null == amConf.get(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB))
+      || (null == amConf.get(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES))) {
+      LOG.info("Not pre-warming containers as container resource"
+        + " requirements not specified"
+        + ", "
+        + TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB
+        + "=" + amConf.get(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB)
+        + ", "
+        + TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES
+        + "=" + amConf.get(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES));
+      return false;
+    }
+
+    Resource containerResource = Resource.newInstance(
+      amConf.getInt(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB,
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_MEMORY_MB_DEFAULT),
+      amConf.getInt(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES,
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_RESOURCE_VCORES_DEFAULT));
+
+    ProcessorDescriptor processorDescriptor = null;
+
+    if (amConf.get(TezConfiguration.TEZ_SESSION_PRE_WARM_PROCESSOR_NAME) != null) {
+      processorDescriptor = new ProcessorDescriptor(amConf.get(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_PROCESSOR_NAME));
+    } else {
+      processorDescriptor = new ProcessorDescriptor(
+        SleepProcessor.class.getName());
+    }
+
+    // Create a DAG using SleepProcessor to launch the required containers.
+    org.apache.tez.dag.api.DAG preWarmContainersDAG =
+      new org.apache.tez.dag.api.DAG("PreWarmContainersDAG");
+    Vertex sleepVertex = new Vertex("PreWarmSleepVertex",
+      processorDescriptor, numContainers, containerResource);
+
+    Map<String, String> environment = new HashMap<String, String>();
+    if (null != amConf.get(
+      TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_ENVIRONMENT)) {
+    Apps.setEnvFromInputString(environment,
+      amConf.get(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_ENVIRONMENT));
+    }
+
+    Apps.addToEnvironment(environment,
+      Environment.CLASSPATH.name(),
+      Environment.PWD.$());
+
+    Apps.addToEnvironment(environment,
+      Environment.CLASSPATH.name(),
+      Environment.PWD.$() + File.separator + "*");
+
+    // Add YARN/COMMON/HDFS jars to path
+    for (String c : amConf.getStrings(
+        YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+      Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
+        c.trim());
+    }
+
+    sleepVertex.setTaskEnvironment(environment);
+
+    if (null != amConf.get(
+      TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_JAVA_OPTS)) {
+      sleepVertex.setJavaOpts(amConf.get(
+        TezConfiguration.TEZ_SESSION_PRE_WARM_CONTAINER_JAVA_OPTS));
+    }
+
+    preWarmContainersDAG.addVertex(sleepVertex);
+
+    LOG.info("Starting DAG to pre-warm containers for AM using "
+      + SleepProcessor.class.getName()
+      + ", numContainers=" + numContainers
+      + ", containerResource=" + containerResource);
+    startDAG(preWarmContainersDAG.createDag(amConf));
+    return true;
+  }
+
   @Override
-  public void serviceStop() throws Exception {
+  public synchronized void serviceStop() throws Exception {
     if (isSession) {
       sessionStopped.set(true);
     }
@@ -1276,12 +1412,9 @@ public class DAGAppMaster extends AbstractService {
       DAGPlan dagPlan = null;
 
       // Read the protobuf DAG
-      DAGPlan.Builder dagPlanBuilder = DAGPlan.newBuilder();
       dagPBBinaryStream = new FileInputStream(
           TezConfiguration.TEZ_PB_PLAN_BINARY_NAME);
-      dagPlanBuilder.mergeFrom(dagPBBinaryStream);
-
-      dagPlan = dagPlanBuilder.build();
+      dagPlan = DAGPlan.parseFrom(dagPBBinaryStream);
 
       startDAG(dagPlan);
 
@@ -1293,6 +1426,7 @@ public class DAGAppMaster extends AbstractService {
   }
 
   private void startDAG(DAGPlan dagPlan) {
+    this.state = DAGAppMasterState.RUNNING;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Running a DAG with " + dagPlan.getVertexCount()
           + " vertices ");
@@ -1312,7 +1446,6 @@ public class DAGAppMaster extends AbstractService {
 
   private void startDAG(DAG dag) {
     currentDAG = dag;
-    this.state = DAGAppMasterState.RUNNING;
 
     // End of creating the job.
     ((RunningAppContext) context).setDAG(currentDAG);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 582d274..23fd394 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -525,8 +525,10 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
         }
 
         List<TezEvent> inEvents = request.getEvents();
-        LOG.info("Ping from " + taskAttemptID.toString() +
-            " events: " + (inEvents != null? inEvents.size() : -1));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ping from " + taskAttemptID.toString() +
+              " events: " + (inEvents != null? inEvents.size() : -1));
+        }
         if(inEvents!=null && !inEvents.isEmpty()) {
           TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
           context.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ec37eb8..97034d4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -464,6 +464,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.localResources = DagTypeConverters
         .createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig()
             .getLocalResourceList());
+    this.localResources.putAll(appContext.getSessionResources());
     this.environment = DagTypeConverters
         .createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig()
             .getEnvironmentSettingList());
@@ -1052,7 +1053,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     // for now, only for leaf vertices
     // TODO TEZ-41 make commmitter type configurable per vertex
     
-    if (targetVertices.isEmpty()) {
+    if (!this.additionalOutputSpecs.isEmpty()) {
       committer = new MRVertexOutputCommitter();
     }
     try {
@@ -1608,8 +1609,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
       List<TezEvent> tezEvents = rEvent.getEvents();
       for(TezEvent tezEvent : tezEvents) {
-        LOG.info("Vertex: " + vertex.getName() + " routing event: "
-            + tezEvent.getEventType());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Vertex: " + vertex.getName() + " routing event: "
+              + tezEvent.getEventType());
+        }
         EventMetaData sourceMeta = tezEvent.getSourceInfo();
         switch(tezEvent.getEventType()) {
         case DATA_MOVEMENT_EVENT:

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 4093b85..196e3d4 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -55,6 +55,7 @@ import org.apache.tez.client.TezClient;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezSession;
 import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -368,6 +369,8 @@ public class OrderedWordCount {
 
         DAGClient dagClient;
         if (useTezSession) {
+          LOG.info("Waiting for TezSession to get into ready state");
+          waitForTezSessionReady(tezSession);
           LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
           dagClient = tezSession.submitDAG(dag);
           LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
@@ -424,4 +427,23 @@ public class OrderedWordCount {
     }
   }
 
+  private static void waitForTezSessionReady(TezSession tezSession)
+    throws IOException, TezException {
+    while (true) {
+      TezSessionStatus status = tezSession.getSessionStatus();
+      if (status.equals(TezSessionStatus.SHUTDOWN)) {
+        throw new RuntimeException("TezSession has already shutdown");
+      }
+      if (status.equals(TezSessionStatus.READY)) {
+        return;
+      }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted while trying to check session status");
+        return;
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e2e9247e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
new file mode 100644
index 0000000..2e3aba0
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.processor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A simple sleep processor implementation that sleeps for the configured
+ * time in milliseconds.
+ *
+ * @see SleepProcessorConfig for configuring the SleepProcessor
+ */
+public class SleepProcessor implements LogicalIOProcessor {
+
+  private static final Log LOG = LogFactory.getLog(SleepProcessor.class);
+
+  private int timeToSleepMS;
+
+  @Override
+  public void initialize(TezProcessorContext processorContext)
+    throws Exception {
+    if (processorContext.getUserPayload() == null) {
+      LOG.info("No processor user payload specified"
+        + ", using default timeToSleep of 1 ms");
+      timeToSleepMS = 1;
+    } else {
+      SleepProcessorConfig cfg =
+        new SleepProcessorConfig();
+      cfg.fromUserPayload(processorContext.getUserPayload());
+      timeToSleepMS = cfg.getTimeToSleepMS();
+    }
+    LOG.info("Initialized SleepProcessor, timeToSleepMS=" + timeToSleepMS);
+  }
+
+  @Override
+  public void run(Map<String, LogicalInput> inputs,
+                  Map<String, LogicalOutput> outputs) throws Exception {
+    LOG.info("Running the Sleep Processor, sleeping for "
+      + timeToSleepMS + " ms");
+    try {
+      Thread.sleep(timeToSleepMS);
+    } catch (InterruptedException ie) {
+      // ignore
+    }
+  }
+
+  @Override
+  public void handleEvents(List<Event> processorEvents) {
+    // Nothing to do
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Nothing to cleanup
+  }
+
+  /**
+   * Configuration for the Sleep Processor.
+   * Only configuration option is time to sleep in milliseconds.
+   */
+  public static class SleepProcessorConfig {
+    private int timeToSleepMS;
+
+    public SleepProcessorConfig() {
+    }
+
+    /**
+     * @param timeToSleepMS Time to sleep in milliseconds
+     */
+    public SleepProcessorConfig (int timeToSleepMS) {
+      this.timeToSleepMS = timeToSleepMS;
+    }
+
+    public byte[] toUserPayload() {
+      return Integer.toString(timeToSleepMS).getBytes();
+    }
+
+    public void fromUserPayload(byte[] userPayload) {
+      timeToSleepMS = Integer.valueOf(new String(userPayload)).intValue();
+    }
+
+    public int getTimeToSleepMS() {
+      return timeToSleepMS;
+    }
+  }
+}