You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:30 UTC

[23/50] [abbrv] TEZ-443. Merge tez-dag-api and tez-engine-api into a single module - tez-api (part of TEZ-398). (sseth)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
new file mode 100644
index 0000000..9cb602c
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+
+
+public class DAG { // FIXME rename to Topology
+  final List<Vertex> vertices;
+  final List<Edge> edges;
+  final String name;
+
+  public DAG(String name) {
+    this.vertices = new ArrayList<Vertex>();
+    this.edges = new ArrayList<Edge>();
+    this.name = name;
+  }
+
+  public synchronized DAG addVertex(Vertex vertex) {
+    if (vertices.contains(vertex)) {
+      throw new IllegalArgumentException(
+          "Vertex " + vertex + " already defined!");
+    }
+    vertices.add(vertex);
+    return this;
+  }
+
+  @Private
+  public synchronized List<Vertex> getVertices() {
+    return Collections.unmodifiableList(this.vertices);
+  }
+
+  public synchronized DAG addEdge(Edge edge) {
+    // Sanity checks
+    if (!vertices.contains(edge.getInputVertex())) {
+      throw new IllegalArgumentException(
+          "Input vertex " + edge.getInputVertex() + " doesn't exist!");
+    }
+    if (!vertices.contains(edge.getOutputVertex())) {
+      throw new IllegalArgumentException(
+          "Output vertex " + edge.getOutputVertex() + " doesn't exist!");
+    }
+    if (edges.contains(edge)) {
+      throw new IllegalArgumentException(
+          "Edge " + edge + " already defined!");
+    }
+
+    // Inform the vertices
+    edge.getInputVertex().addOutputVertex(edge.getOutputVertex(), edge.getId());
+    edge.getOutputVertex().addInputVertex(edge.getInputVertex(), edge.getId());
+
+    edges.add(edge);
+    return this;
+  }
+
+  public String getName() {
+    return this.name;
+  }
+
+  // AnnotatedVertex is used by verify()
+  private static class AnnotatedVertex {
+    Vertex v;
+
+    int index; //for Tarjan's algorithm
+    int lowlink; //for Tarjan's algorithm
+    boolean onstack; //for Tarjan's algorithm
+
+    int outDegree;
+
+    private AnnotatedVertex(Vertex v){
+       this.v = v;
+       index = -1;
+       lowlink = -1;
+       outDegree = 0;
+    }
+  }
+
+  // verify()
+  //
+  // Default rules
+  //   Illegal:
+  //     - duplicate vertex id
+  //     - cycles
+  //
+  //   Ok:
+  //     - orphaned vertex.  Occurs in map-only
+  //     - islands.  Occurs if job has unrelated workflows.
+  //
+  //   Not yet categorized:
+  //     - orphaned vertex in DAG of >1 vertex.  Could be unrelated map-only job.
+  //     - v1->v2 via two edges.  perhaps some self-join job would use this?
+  //
+  // "restricted" mode:
+  //   In short term, the supported DAGs are limited. Call with restricted=true for these verifications.
+  //   Illegal:
+  //     - any vertex with more than one input or output edge. (n-ary input, n-ary merge)
+  public void verify() throws IllegalStateException {
+    verify(true);
+  }
+
+  public void verify(boolean restricted) throws IllegalStateException  {
+    if (vertices.isEmpty()) {
+      throw new IllegalStateException("Invalid dag containing 0 vertices");
+    }
+
+    Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
+    for(Edge e : edges){
+      Vertex inputVertex = e.getInputVertex();
+      List<Edge> edgeList = edgeMap.get(inputVertex);
+      if(edgeList == null){
+        edgeList = new ArrayList<Edge>();
+        edgeMap.put(inputVertex, edgeList);
+      }
+      edgeList.add(e);
+    }
+
+    // check for valid vertices, duplicate vertex names,
+    // and prepare for cycle detection
+    Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
+    for(Vertex v : vertices){
+      if(vertexMap.containsKey(v.getVertexName())){
+         throw new IllegalStateException("DAG contains multiple vertices"
+             + " with name: " + v.getVertexName());
+      }
+      vertexMap.put(v.getVertexName(), new AnnotatedVertex(v));
+    }
+
+    detectCycles(edgeMap, vertexMap);
+
+    if(restricted){
+      for(Edge e : edges){
+        vertexMap.get(e.getInputVertex().getVertexName()).outDegree++;
+        if (e.getEdgeProperty().getDataMovementType() !=
+            DataMovementType.SCATTER_GATHER) {
+          throw new IllegalStateException(
+              "Unsupported connection pattern on edge. " + e);
+        }
+        if (e.getEdgeProperty().getDataSourceType() !=
+            DataSourceType.PERSISTED) {
+          throw new IllegalStateException(
+              "Unsupported source type on edge. " + e);
+        }
+        if (e.getEdgeProperty().getSchedulingType() !=
+            SchedulingType.SEQUENTIAL) {
+          throw new IllegalStateException(
+              "Unsupported scheduling type on edge. " + e);
+        }
+      }
+      for(AnnotatedVertex av: vertexMap.values()){
+        if (av.outDegree > 1) {
+          throw new IllegalStateException("Vertex has outDegree>1: "
+              + av.v.getVertexName());
+        }
+      }
+    }
+  }
+
+  // Adaptation of Tarjan's algorithm for connected components.
+  // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+  private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap)
+      throws IllegalStateException{
+    Integer nextIndex = 0; // boxed integer so it is passed by reference.
+    Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
+    for(AnnotatedVertex av: vertexMap.values()){
+      if(av.index == -1){
+        assert stack.empty();
+        strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
+      }
+    }
+  }
+
+  // part of Tarjan's algorithm for connected components.
+  // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+  private void strongConnect(
+          AnnotatedVertex av,
+          Map<String, AnnotatedVertex> vertexMap,
+          Map<Vertex, List<Edge>> edgeMap,
+          Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException{
+    av.index = nextIndex;
+    av.lowlink = nextIndex;
+    nextIndex++;
+    stack.push(av);
+    av.onstack = true;
+
+    List<Edge> edges = edgeMap.get(av.v);
+    if(edges != null){
+      for(Edge e : edgeMap.get(av.v)){
+        AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName());
+        if(outVertex.index == -1){
+          strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
+          av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
+        }
+        else if(outVertex.onstack){
+          // strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
+          // update lowlink in case outputVertex should be considered the root of this component.
+          av.lowlink = Math.min(av.lowlink, outVertex.index);
+        }
+      }
+    }
+
+    if(av.lowlink == av.index ){
+       AnnotatedVertex pop = stack.pop();
+       pop.onstack = false;
+       if(pop != av){
+         // there was something on the stack other than this "av".
+         // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av"
+         StringBuilder message = new StringBuilder();
+         message.append(av.v.getVertexName() + " <- ");
+         for( ; pop != av; pop = stack.pop()){
+           message.append(pop.v.getVertexName() + " <- ");
+           pop.onstack = false;
+         }
+         message.append(av.v.getVertexName());
+         throw new IllegalStateException("DAG contains a cycle: " + message);
+       }
+    }
+  }
+
+
+  // create protobuf message describing DAG
+  @Private
+  public DAGPlan createDag(Configuration dagConf) {
+    verify(true);
+
+    DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
+
+    dagBuilder.setName(this.name);
+
+    for(Vertex vertex : vertices){
+      VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
+      vertexBuilder.setName(vertex.getVertexName());
+      vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
+      vertexBuilder.setProcessorDescriptor(DagTypeConverters
+          .convertToDAGPlan(vertex.getProcessorDescriptor()));
+
+      //task config
+      PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
+      Resource resource = vertex.getTaskResource();
+      taskConfigBuilder.setNumTasks(vertex.getParallelism());
+      taskConfigBuilder.setMemoryMb(resource.getMemory());
+      taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
+      taskConfigBuilder.setJavaOpts(vertex.getJavaOpts());
+
+      taskConfigBuilder.setTaskModule(vertex.getVertexName());
+      PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
+      Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
+      for(Entry<String, LocalResource> entry : lrs.entrySet()){
+        String key = entry.getKey();
+        LocalResource lr = entry.getValue();
+        localResourcesBuilder.setName(key);
+        localResourcesBuilder.setUri(
+            DagTypeConverters.convertToDAGPlan(lr.getResource()));
+        localResourcesBuilder.setSize(lr.getSize());
+        localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+        localResourcesBuilder.setType(
+            DagTypeConverters.convertToDAGPlan(lr.getType()));
+        localResourcesBuilder.setVisibility(
+            DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+        if(lr.getType() == LocalResourceType.PATTERN){
+          if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+            throw new TezUncheckedException("LocalResource type set to pattern"
+                + " but pattern is null or empty");
+          }
+          localResourcesBuilder.setPattern(lr.getPattern());
+        }
+        taskConfigBuilder.addLocalResource(localResourcesBuilder);
+      }
+
+      if(vertex.getTaskEnvironment() != null){
+        for(String key : vertex.getTaskEnvironment().keySet()){
+          PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
+          envSettingBuilder.setKey(key);
+          envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
+          taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
+        }
+      }
+
+      if(vertex.getTaskLocationsHint() != null ){
+        if(vertex.getTaskLocationsHint().getTaskLocationHints() != null){
+          for(TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()){
+            PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
+
+            if(hint.getDataLocalHosts() != null){
+              taskLocationHintBuilder.addAllHost(hint.getDataLocalHosts());
+            }
+            if(hint.getRacks() != null){
+              taskLocationHintBuilder.addAllRack(hint.getRacks());
+            }
+
+            vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
+          }
+        }
+      }
+
+      for(String inEdgeId : vertex.getInputEdgeIds()){
+        vertexBuilder.addInEdgeId(inEdgeId);
+      }
+
+      for(String outEdgeId : vertex.getOutputEdgeIds()){
+        vertexBuilder.addOutEdgeId(outEdgeId);
+      }
+
+      vertexBuilder.setTaskConfig(taskConfigBuilder);
+      dagBuilder.addVertex(vertexBuilder);
+    }
+
+    for(Edge edge : edges){
+      EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
+      edgeBuilder.setId(edge.getId());
+      edgeBuilder.setInputVertexName(edge.getInputVertex().getVertexName());
+      edgeBuilder.setOutputVertexName(edge.getOutputVertex().getVertexName());
+      edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType()));
+      edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType()));
+      edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType()));
+      edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource()));
+      edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination()));
+      dagBuilder.addEdge(edgeBuilder);
+    }
+
+    if(dagConf != null) {
+      Iterator<Entry<String, String>> iter = dagConf.iterator();
+      ConfigurationProto.Builder confProtoBuilder =
+          ConfigurationProto.newBuilder();
+      while (iter.hasNext()) {
+        Entry<String, String> entry = iter.next();
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(entry.getKey());
+        kvp.setValue(entry.getValue());
+        confProtoBuilder.addConfKeyValues(kvp);
+      }
+      dagBuilder.setDagKeyValues(confProtoBuilder);
+    }
+
+    return dagBuilder.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
new file mode 100644
index 0000000..1fd78f1
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
+import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+import com.google.protobuf.ByteString;
+
+
+public class DagTypeConverters {
+  
+  public static PlanLocalResourceVisibility convertToDAGPlan(LocalResourceVisibility visibility){
+    switch(visibility){
+      case PUBLIC : return PlanLocalResourceVisibility.PUBLIC;  
+      case PRIVATE : return PlanLocalResourceVisibility.PRIVATE;
+      case APPLICATION : return PlanLocalResourceVisibility.APPLICATION;
+      default : throw new RuntimeException("unknown 'visibility': " + visibility);
+    }
+  }
+  
+  public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){
+    switch(visibility){
+      case PUBLIC : return LocalResourceVisibility.PUBLIC;  
+      case PRIVATE : return LocalResourceVisibility.PRIVATE;
+      case APPLICATION : return LocalResourceVisibility.APPLICATION;
+      default : throw new RuntimeException("unknown 'visibility': " + visibility);
+    }
+  }
+  
+  public static PlanEdgeDataSourceType convertToDAGPlan(DataSourceType sourceType){
+    switch(sourceType){
+      case PERSISTED : return PlanEdgeDataSourceType.PERSISTED;  
+      case PERSISTED_RELIABLE : return PlanEdgeDataSourceType.PERSISTED_RELIABLE;
+      case EPHEMERAL :  return PlanEdgeDataSourceType.EPHEMERAL;
+      default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType);
+    }
+  }
+  
+  public static DataSourceType convertFromDAGPlan(PlanEdgeDataSourceType sourceType){
+    switch(sourceType){
+      case PERSISTED : return DataSourceType.PERSISTED;  
+      case PERSISTED_RELIABLE : return DataSourceType.PERSISTED_RELIABLE;
+      case EPHEMERAL :  return DataSourceType.EPHEMERAL;
+      default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType);
+    }
+  }
+  
+  public static PlanEdgeDataMovementType convertToDAGPlan(DataMovementType type){
+    switch(type){
+      case ONE_TO_ONE : return PlanEdgeDataMovementType.ONE_TO_ONE;  
+      case BROADCAST : return PlanEdgeDataMovementType.BROADCAST;
+      case SCATTER_GATHER : return PlanEdgeDataMovementType.SCATTER_GATHER;
+      default : throw new RuntimeException("unknown 'dataMovementType': " + type);
+    }
+  }
+  
+  public static DataMovementType convertFromDAGPlan(PlanEdgeDataMovementType type){
+    switch(type){
+      case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE;  
+      case BROADCAST : return DataMovementType.BROADCAST;
+      case SCATTER_GATHER : return DataMovementType.SCATTER_GATHER;
+      default : throw new IllegalArgumentException("unknown 'dataMovementType': " + type);
+    }
+  }
+  
+  public static PlanEdgeSchedulingType convertToDAGPlan(SchedulingType type){
+    switch(type){
+      case SEQUENTIAL : return PlanEdgeSchedulingType.SEQUENTIAL;  
+      case CONCURRENT : return PlanEdgeSchedulingType.CONCURRENT;
+      default : throw new RuntimeException("unknown 'SchedulingType': " + type);
+    }
+  }
+  
+  public static SchedulingType convertFromDAGPlan(PlanEdgeSchedulingType type){
+    switch(type){
+      case SEQUENTIAL : return SchedulingType.SEQUENTIAL;  
+      case CONCURRENT : return SchedulingType.CONCURRENT;
+      default : throw new IllegalArgumentException("unknown 'SchedulingType': " + type);
+    }
+  }
+  
+  public static PlanLocalResourceType convertToDAGPlan(LocalResourceType type) {
+    switch(type){
+    case ARCHIVE : return PlanLocalResourceType.ARCHIVE;
+    case FILE : return PlanLocalResourceType.FILE;
+    case PATTERN : return PlanLocalResourceType.PATTERN;
+    default : throw new IllegalArgumentException("unknown 'type': " + type);
+    }
+  }
+  
+  public static LocalResourceType convertFromDAGPlan(PlanLocalResourceType type) {
+    switch(type){
+    case ARCHIVE : return LocalResourceType.ARCHIVE;
+    case FILE : return LocalResourceType.FILE;
+    case PATTERN : return LocalResourceType.PATTERN;
+    default : throw new IllegalArgumentException("unknown 'type': " + type);
+    }
+  }
+
+  public static VertexLocationHint convertFromDAGPlan(
+      List<PlanTaskLocationHint> locationHints) {
+
+    List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();  
+    
+    for(PlanTaskLocationHint inputHint : locationHints){
+      TaskLocationHint outputHint = new TaskLocationHint(
+          new HashSet<String>(inputHint.getHostList()),
+          new HashSet<String>(inputHint.getRackList()));
+      outputList.add(outputHint);
+    }
+    return new VertexLocationHint(outputList.size(), outputList);
+  }
+
+  // notes re HDFS URL handling:
+  //   Resource URLs in the protobuf message are strings of the form hdfs://host:port/path 
+  //   org.apache.hadoop.fs.Path.Path  is actually a URI type that allows any scheme
+  //   org.apache.hadoop.yarn.api.records.URL is a URL type used by YARN.
+  //   java.net.URL cannot be used out of the box as it rejects unknown schemes such as HDFS.
+  
+  public static String convertToDAGPlan(URL resource) {
+    // see above notes on HDFS URL handling
+    String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort() 
+        + resource.getFile();
+    return out;
+  }
+
+  public static Map<String, LocalResource> createLocalResourceMapFromDAGPlan(
+      List<PlanLocalResource> localResourcesList) {
+    Map<String, LocalResource> map = new HashMap<String, LocalResource>();
+    for(PlanLocalResource res : localResourcesList){
+      LocalResource r = new LocalResourcePBImpl();
+      
+      //NOTE: have to check every optional field in protobuf generated classes for existence before accessing
+      //else we will receive a default value back, eg ""
+      if(res.hasPattern()){
+        r.setPattern(res.getPattern());
+      }
+      r.setResource(ConverterUtils.getYarnUrlFromPath(new Path(res.getUri())));  // see above notes on HDFS URL handling
+      r.setSize(res.getSize());
+      r.setTimestamp(res.getTimeStamp());
+      r.setType(DagTypeConverters.convertFromDAGPlan(res.getType()));
+      r.setVisibility(DagTypeConverters.convertFromDAGPlan(res.getVisibility()));
+      map.put(res.getName(), r);
+    }
+    return map;
+  }
+
+  public static Map<String, String> createEnvironmentMapFromDAGPlan(
+      List<PlanKeyValuePair> environmentSettingList) {  
+      
+    Map<String, String> map = new HashMap<String, String>();
+    for(PlanKeyValuePair setting : environmentSettingList){
+      map.put(setting.getKey(), setting.getValue());
+    }
+    
+    return map;
+  }
+  
+  public static Map<String, EdgePlan> createEdgePlanMapFromDAGPlan(List<EdgePlan> edgeList){
+    Map<String, EdgePlan> edgePlanMap =
+        new HashMap<String, EdgePlan>();
+    for(EdgePlan edgePlanItem : edgeList){
+      edgePlanMap.put(edgePlanItem.getId(), edgePlanItem);
+    }
+    return edgePlanMap;
+  }
+  
+  public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) {
+    return new EdgeProperty(
+        convertFromDAGPlan(edge.getDataMovementType()),
+        convertFromDAGPlan(edge.getDataSourceType()),
+        convertFromDAGPlan(edge.getSchedulingType()),
+        convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()),
+        convertInputDescriptorFromDAGPlan(edge.getEdgeDestination())
+    );
+  }
+
+  public static Resource createResourceRequestFromTaskConfig(
+      PlanTaskConfiguration taskConfig) {
+    return Resource.newInstance(taskConfig.getMemoryMb(), taskConfig.getVirtualCores());
+  }
+
+  public static Map<String, String> convertConfFromProto(
+      ConfigurationProto confProto) {
+    List<PlanKeyValuePair> settingList = confProto.getConfKeyValuesList();
+    Map<String, String> map = new HashMap<String, String>();
+    for(PlanKeyValuePair setting: settingList){
+      map.put(setting.getKey(), setting.getValue());
+    }
+    return map;
+  }
+
+  public static TezEntityDescriptorProto convertToDAGPlan(
+      TezEntityDescriptor descriptor) {
+    TezEntityDescriptorProto.Builder builder = TezEntityDescriptorProto
+        .newBuilder();
+    builder.setClassName(descriptor.getClassName());
+    if (descriptor.getUserPayload() != null) {
+      builder
+          .setUserPayload(ByteString.copyFrom(descriptor.getUserPayload()));
+    }
+    return builder.build();
+  }
+
+  public static InputDescriptor convertInputDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    byte[] bb = null;
+    if (proto.hasUserPayload()) {
+      bb = proto.getUserPayload().toByteArray();
+    }
+    return new InputDescriptor(className).setUserPayload(bb);
+  }
+
+  public static OutputDescriptor convertOutputDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    byte[] bb = null;
+    if (proto.hasUserPayload()) {
+      bb =  proto.getUserPayload().toByteArray();
+    }
+    return new OutputDescriptor(className).setUserPayload(bb);
+  }
+
+  public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan(
+      TezEntityDescriptorProto proto) {
+    String className = proto.getClassName();
+    byte[] bb = null;
+    if (proto.hasUserPayload()) {
+      bb = proto.getUserPayload().toByteArray();
+    }
+    return new ProcessorDescriptor(className).setUserPayload(bb);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
new file mode 100644
index 0000000..a893bc3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+public class Edge{
+  
+  private final Vertex inputVertex;
+  private final Vertex outputVertex;
+  private final EdgeProperty edgeProperty;
+    
+  public Edge(Vertex inputVertex, 
+               Vertex outputVertex, 
+               EdgeProperty edgeProperty) {
+    this.inputVertex = inputVertex;
+    this.outputVertex = outputVertex;
+    this.edgeProperty = edgeProperty;
+  }
+  
+  // RENAME to source and destination
+  public Vertex getInputVertex() {
+    return inputVertex;
+  }
+  
+  public Vertex getOutputVertex() {
+    return outputVertex;
+  }
+  
+  public EdgeProperty getEdgeProperty() {
+    return edgeProperty;
+  }
+  
+  /*
+   * Used to identify the edge in the configuration
+   */
+  public String getId() {
+    return String.valueOf(this.hashCode());
+  }
+ 
+  @Override
+  public String toString() {
+    return inputVertex + " -> " + outputVertex + " (" + edgeProperty + ")";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
new file mode 100644
index 0000000..326d3d0
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+public class EdgeProperty {
+  
+  /**
+   * Defines the manner of data movement between source and destination tasks.
+   * Determines which destination tasks have access to data produced on this
+   * edge by a source task. A destination task may choose to read any portion of
+   * the data available to it.
+   */
+  public enum DataMovementType {
+    /**
+     * Output on this edge produced by the i-th source task is available to the 
+     * i-th destination task.
+     */
+    ONE_TO_ONE,
+    /**
+     * Output on this edge produced by any source task is available to all
+     * destination tasks.
+     */
+    BROADCAST,
+    /**
+     * The i-th output on this edge produced by all source tasks is available to
+     * the same destination task. Source tasks scatter their outputs and they
+     * are gathered by designated destination tasks.
+     */
+    SCATTER_GATHER
+  }
+  
+  /**
+   * Determines the lifetime of the data produced on this edge by a source task.
+   */
+  public enum DataSourceType {
+    /**
+     * Data produced by the source is persisted and available even when the
+     * task is not running. The data may become unavailable and may cause the 
+     * source task to be re-executed.
+     */
+    PERSISTED,
+    /**
+     * Source data is stored reliably and will always be available
+     */
+    PERSISTED_RELIABLE,
+    /**
+     * Data produced by the source task is available only while the source task
+     * is running. This requires the destination task to run concurrently with 
+     * the source task.
+     */
+    EPHEMERAL
+  }
+  
+  /**
+   * Determines when the destination task is eligible to run, once the source  
+   * task is eligible to run.
+   */
+  public enum SchedulingType {
+    /**
+     * Destination task is eligible to run after one or more of its source tasks 
+     * have started or completed.
+     */
+    SEQUENTIAL,
+    /**
+     * Destination task must run concurrently with the source task
+     */
+    CONCURRENT
+  }
+  
+  DataMovementType dataMovementType;
+  DataSourceType dataSourceType;
+  SchedulingType schedulingType;
+  InputDescriptor inputDescriptor;
+  OutputDescriptor outputDescriptor;
+  
+  /**
+   * @param dataMovementType
+   * @param dataSourceType
+   * @param edgeSource
+   *          The {@link OutputDescriptor} that generates data on the edge.
+   * @param edgeDestination
+   *          The {@link InputDescriptor} which will consume data from the edge.
+   */
+  public EdgeProperty(DataMovementType dataMovementType, 
+                       DataSourceType dataSourceType,
+                       SchedulingType schedulingType,
+                       OutputDescriptor edgeSource,
+                       InputDescriptor edgeDestination) {
+    this.dataMovementType = dataMovementType;
+    this.dataSourceType = dataSourceType;
+    this.schedulingType = schedulingType;
+    this.inputDescriptor = edgeDestination;
+    this.outputDescriptor = edgeSource;
+  }
+  
+  public DataMovementType getDataMovementType() {
+    return dataMovementType;
+  }
+  
+  public DataSourceType getDataSourceType() {
+    return dataSourceType;
+  }
+  
+  public SchedulingType getSchedulingType() {
+    return schedulingType;
+  }
+  
+  /**
+   * Returns the {@link InputDescriptor} which will consume data from the edge.
+   * 
+   * @return
+   */
+  public InputDescriptor getEdgeDestination() {
+    return inputDescriptor;
+  }
+  
+  /**
+   * Returns the {@link OutputDescriptor} which produces data on the edge.
+   * 
+   * @return
+   */
+  public OutputDescriptor getEdgeSource() {
+    return outputDescriptor;
+  }
+  
+  @Override
+  public String toString() {
+    return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()
+        + " >> " + dataSourceType + " >> " + outputDescriptor.getClassName() + " }";
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
new file mode 100644
index 0000000..dea9001
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public class InputDescriptor extends TezEntityDescriptor {
+
+  public InputDescriptor(String inputClassName) {
+    super(inputClassName);
+  }
+
+  @Override
+  public InputDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
new file mode 100644
index 0000000..16fb9b1
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public class OutputDescriptor extends TezEntityDescriptor {
+
+  public OutputDescriptor(String outputClassName) {
+    super(outputClassName);
+  }
+
+  @Override
+  public OutputDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
new file mode 100644
index 0000000..092147d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public class ProcessorDescriptor extends TezEntityDescriptor {
+
+  public ProcessorDescriptor(String processorClassName) {
+    super(processorClassName);
+  }
+
+  public ProcessorDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
new file mode 100644
index 0000000..7447974
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class TezConfiguration extends Configuration {
+
+  public final static String TEZ_SITE_XML = "tez-site.xml";
+
+  static {
+    addDefaultResource(TEZ_SITE_XML);
+  }
+
+  public TezConfiguration() {
+    super();
+  }
+
+  public TezConfiguration(Configuration conf) {
+    super(conf);
+  }
+
+  public static final String TEZ_PREFIX = "tez.";
+  public static final String TEZ_AM_PREFIX = TEZ_PREFIX + "am.";
+  public static final String TEZ_TASK_PREFIX = TEZ_PREFIX + "task.";
+
+  public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
+  public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/tez/staging";
+
+  // TODO Should not be required once all tokens are handled via AppSubmissionContext
+  public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir";
+  public static final String APPLICATION_TOKENS_FILE = "appTokens";
+  public static final String TEZ_APPLICATION_MASTER_CLASS =
+      "org.apache.tez.dag.app.DAGAppMaster";
+
+  /** Root Logging level passed to the Tez app master.*/
+  public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX+"log.level";
+  public static final String TEZ_AM_LOG_LEVEL_DEFAULT = "INFO";
+
+  public static final String TEZ_AM_JAVA_OPTS = TEZ_AM_PREFIX
+      + "java.opts";
+  public static final String DEFAULT_TEZ_AM_JAVA_OPTS = " -Xmx1024m ";
+
+  public static final String TEZ_AM_CANCEL_DELEGATION_TOKEN = TEZ_AM_PREFIX +
+      "am.complete.cancel.delegation.tokens";
+  public static final boolean TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT = true;
+
+  public static final String TEZ_AM_TASK_LISTENER_THREAD_COUNT =
+      TEZ_AM_PREFIX + "task.listener.thread-count";
+  public static final int TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
+
+  public static final String TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT =
+      TEZ_AM_PREFIX + "container.listener.thread-count";
+  public static final int TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT = 30;
+
+  // TODO Some of the DAG properties are job specific and not AM specific. Rename accordingly.
+  // TODO Are any of these node blacklisting properties required. (other than for MR compat)
+  public static final String TEZ_AM_MAX_TASK_FAILURES_PER_NODE = TEZ_AM_PREFIX
+      + "maxtaskfailures.per.node";
+  public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 3;
+
+  public static final String TEZ_AM_MAX_TASK_ATTEMPTS =
+      TEZ_AM_PREFIX + "max.task.attempts";
+  public static final int TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT = 4;
+
+  public static final String TEZ_AM_NODE_BLACKLISTING_ENABLED = TEZ_AM_PREFIX
+      + "node-blacklisting.enabled";
+  public static final boolean TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT = true;
+  public static final String TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD = TEZ_AM_PREFIX
+      + "node-blacklisting.ignore-threshold-node-percent";
+  public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
+
+  /** Number of threads to handle job client RPC requests.*/
+  public static final String TEZ_AM_CLIENT_THREAD_COUNT =
+      TEZ_AM_PREFIX + "client.am.thread-count";
+  public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 1;
+  /**
+   * Range of ports that the AM can use when binding. Leave blank
+   * if you want all possible ports.
+   */
+  public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
+      TEZ_AM_PREFIX + "client.am.port-range";
+
+
+  public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX
+      + "resource.memory.mb";
+  public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1536;
+
+  public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX
+      + "resource.cpu.vcores";
+  public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1;
+
+  public static final String
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = TEZ_AM_PREFIX
+          + "shuffle-vertex-manager.min-src-fraction";
+  public static final float
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
+
+  public static final String
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = TEZ_AM_PREFIX
+          + "shuffle-vertex-manager.max-src-fraction";
+  public static final float
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
+
+  public static final String
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = TEZ_AM_PREFIX +
+          "shuffle-vertex-manager.enable.auto-parallel";
+  public static final boolean
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false;
+
+  public static final String
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = TEZ_AM_PREFIX +
+          "shuffle-vertex-manager.desired-task-input-size";
+  public static final long
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT =
+          1024*1024*100L;
+
+  public static final String
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM = TEZ_AM_PREFIX +
+          "shuffle-vertex-manager.min-task-parallelism";
+  public static final int
+          TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1;
+
+  public static final String
+          TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX
+          + "slowstart-dag-scheduler.min-resource-fraction";
+  public static final float
+          TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION_DEFAULT = 0.5f;
+
+  public static final String TEZ_AM_AGGRESSIVE_SCHEDULING = TEZ_AM_PREFIX +
+      "aggressive.scheduling";
+  public static boolean TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT = false;
+
+  /**
+   * The complete path to the serialized dag plan file
+   * <code>TEZ_AM_PLAN_PB_BINARY</code>. Used to make the plan available to
+   * individual tasks if needed. This will typically be a path in the job submit
+   * directory.
+   */
+  public static final String TEZ_AM_PLAN_REMOTE_PATH = TEZ_AM_PREFIX
+      + "dag-am-plan.remote.path";
+
+  public static final String TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX = TEZ_AM_PREFIX
+      + "am-rm.heartbeat.interval-ms.max";
+  public static final int TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT = 1000;
+
+  public static final String TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX = TEZ_TASK_PREFIX
+      + "get-task.sleep.interval-ms.max";
+  public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 500;
+
+  public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX
+      + "am.heartbeat.interval-ms.max";
+  public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100;
+
+  public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
+      + "max-events-per-heartbeat.max";
+  public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 100;
+
+  /**
+   * Configuration to specify whether container should be reused.
+   */
+  public static final String TEZ_AM_CONTAINER_REUSE_ENABLED = TEZ_AM_PREFIX
+      + "container.reuse.enabled";
+  public static final boolean TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT = true;
+
+  /**
+   * Whether to reuse containers for rack local tasks. Active only if reuse is
+   * enabled.
+   */
+  public static final String TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED = TEZ_AM_PREFIX
+      + "container.reuse.rack-fallback.enabled";
+  public static final boolean TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT = true;
+
+  /**
+   * Whether to reuse containers for non-local tasks. Active only if reuse is
+   * enabled.
+   */
+  public static final String TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED = TEZ_AM_PREFIX
+      + "container.reuse.non-local-fallback.enabled";
+  public static final boolean TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT = false;
+
+  public static final String TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS = TEZ_AM_PREFIX
+      + "container.reuse.delay-allocation-millis";
+  public static final long TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS_DEFAULT = 3000l;
+
+  public static final String TEZ_PB_BINARY_CONF_NAME = "tez-conf.pb";
+  public static final String TEZ_PB_PLAN_BINARY_NAME = "tez-dag.pb";
+  public static final String TEZ_PB_PLAN_TEXT_NAME = "tez-dag.pb.txt";
+
+  /*
+   * Logger properties
+   */
+  public static final String TEZ_CONTAINER_LOG4J_PROPERTIES_FILE = "tez-container-log4j.properties";
+  public static final String TEZ_CONTAINER_LOGGER_NAME = "CLA";
+  public static final String TEZ_ROOT_LOGGER_NAME = "tez.root.logger";
+  public static final String TEZ_CONTAINER_LOG_FILE_NAME = "syslog";
+  public static final String TEZ_CONTAINER_ERR_FILE_NAME = "stderr";
+  public static final String TEZ_CONTAINER_OUT_FILE_NAME = "stdout";
+
+
+  public static final String TEZ_LIB_URIS =
+      TEZ_PREFIX + "lib.uris";
+
+  public static final String TEZ_APPLICATION_TYPE = "TEZ-MR*";
+
+  public static final String LOCAL_FRAMEWORK_NAME = "local-tez";
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
new file mode 100644
index 0000000..5463d65
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+/**
+ * Specifies all constant values in Tez
+ */
+public class TezConstants {
+
+  // Env variable names
+  public static final String TEZ_AM_IS_SESSION_ENV = "TEZ_AM_IS_SESSION";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
new file mode 100644
index 0000000..9d4b2c4
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+public abstract class TezEntityDescriptor {
+
+  protected byte[] userPayload;
+  private String className;
+
+  public TezEntityDescriptor(String className) {
+    this.className = className;
+  }
+
+  public byte[] getUserPayload() {
+    return this.userPayload;
+  }
+
+  public TezEntityDescriptor setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+    return this;
+  }
+
+  public String getClassName() {
+    return this.className;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java
new file mode 100644
index 0000000..e3b14e7
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+/**
+ *  Base TezException
+ */
+public class TezException extends Exception {
+  private static final long serialVersionUID = 6337442733802964447L;
+  public TezException(Throwable cause) { super(cause); }
+  public TezException(String message) { super(message); }
+  public TezException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java
new file mode 100644
index 0000000..f55f6dd
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api;
+
+/**
+ * Base Tez Unchecked Exception
+ */
+public class TezUncheckedException extends RuntimeException {
+
+  private static final long serialVersionUID = -4956339297375386184L;
+  
+  public TezUncheckedException(Throwable cause) { super(cause); }
+  public TezUncheckedException(String message) { super(message); }
+  public TezUncheckedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
new file mode 100644
index 0000000..900822b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+
+public class Vertex { // FIXME rename to Task
+
+  private final String vertexName;
+  private final ProcessorDescriptor processorDescriptor;
+
+  private final int parallelism;
+  private VertexLocationHint taskLocationsHint;
+  private final Resource taskResource;
+  private Map<String, LocalResource> taskLocalResources;
+  private Map<String, String> taskEnvironment;
+
+  private final List<Vertex> inputVertices = new ArrayList<Vertex>();
+  private final List<Vertex> outputVertices = new ArrayList<Vertex>();
+  private final List<String> inputEdgeIds = new ArrayList<String>();
+  private final List<String> outputEdgeIds = new ArrayList<String>();
+  private String javaOpts = "";
+
+
+  public Vertex(String vertexName,
+      ProcessorDescriptor processorDescriptor,
+      int parallelism,
+      Resource taskResource) {
+    this.vertexName = vertexName;
+    this.processorDescriptor = processorDescriptor;
+    this.parallelism = parallelism;
+    this.taskResource = taskResource;
+    if (parallelism == 0) {
+      throw new IllegalArgumentException("Parallelism cannot be 0");
+    }
+    if (taskResource == null) {
+      throw new IllegalArgumentException("Resource cannot be null");
+    }
+  }
+
+  public String getVertexName() { // FIXME rename to getName()
+    return vertexName;
+  }
+
+  public ProcessorDescriptor getProcessorDescriptor() {
+    return this.processorDescriptor;
+  }
+
+  public int getParallelism() {
+    return parallelism;
+  }
+
+  public Resource getTaskResource() {
+    return taskResource;
+  }
+
+  public Vertex setTaskLocationsHint(List<TaskLocationHint> locations) {
+    if (locations == null) {
+      return this;
+    }
+    assert locations.size() == parallelism;
+    taskLocationsHint = new VertexLocationHint(parallelism, locations);
+    return this;
+  }
+
+  // used internally to create parallelism location resource file
+  VertexLocationHint getTaskLocationsHint() {
+    return taskLocationsHint;
+  }
+
+  public Vertex setTaskLocalResources(Map<String, LocalResource> localResources) {
+    this.taskLocalResources = localResources;
+    return this;
+  }
+
+  public Map<String, LocalResource> getTaskLocalResources() {
+    return taskLocalResources;
+  }
+
+  public Vertex setTaskEnvironment(Map<String, String> environment) {
+    this.taskEnvironment = environment;
+    return this;
+  }
+
+  public Map<String, String> getTaskEnvironment() {
+    return taskEnvironment;
+  }
+
+  public Vertex setJavaOpts(String javaOpts){
+     this. javaOpts = javaOpts;
+     return this;
+  }
+
+  public String getJavaOpts(){
+	  return javaOpts;
+  }
+
+  @Override
+  public String toString() {
+    return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]";
+  }
+
+  void addInputVertex(Vertex inputVertex, String edgeId) {
+    inputVertices.add(inputVertex);
+    inputEdgeIds.add(edgeId);
+  }
+
+  void addOutputVertex(Vertex outputVertex, String edgeId) {
+    outputVertices.add(outputVertex);
+    outputEdgeIds.add(edgeId);
+  }
+
+  List<Vertex> getInputVertices() {
+    return inputVertices;
+  }
+
+  List<Vertex> getOutputVertices() {
+    return outputVertices;
+  }
+
+  List<String> getInputEdgeIds() {
+    return inputEdgeIds;
+  }
+
+  List<String> getOutputEdgeIds() {
+    return outputEdgeIds;
+  }
+
+  // FIXME how do we support profiling? Can't profile all tasks.
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
new file mode 100644
index 0000000..4f19314
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public class VertexLocationHint  {
+
+  private final int numTasks;
+  private final List<TaskLocationHint> taskLocationHints;
+
+  public VertexLocationHint(int numTasks,
+      List<TaskLocationHint> taskLocationHints) {
+    this.numTasks = numTasks;
+    if (taskLocationHints != null) {
+      this.taskLocationHints = Collections.unmodifiableList(taskLocationHints);
+    } else {
+      this.taskLocationHints = null;
+    }
+  }
+
+  public int getNumTasks() {
+    return numTasks;
+  }
+
+  public List<TaskLocationHint> getTaskLocationHints() {
+    return taskLocationHints;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 7883;
+    int result = 1;
+    result = prime * result + numTasks;
+    if (taskLocationHints != null) {
+      result = prime * result + taskLocationHints.hashCode();
+    }
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    VertexLocationHint other = (VertexLocationHint) obj;
+    if (numTasks != other.numTasks) {
+      return false;
+    }
+    if (taskLocationHints != null) {
+      if (!taskLocationHints.equals(other.taskLocationHints)) {
+        return false;
+      }
+    } else if (other.taskLocationHints != null) {
+      return false;
+    }
+    return true;
+  }
+
+  public static class TaskLocationHint {
+
+    // Host names if any to be used
+    private final Set<String> hosts;
+    // Rack names if any to be used
+    private final Set<String> racks;
+
+    public TaskLocationHint(Set<String> hosts, Set<String> racks) {
+      if (hosts != null) {
+        this.hosts = Collections.unmodifiableSet(hosts);
+      } else {
+        this.hosts = null;
+      }
+      if (racks != null) {
+        this.racks = Collections.unmodifiableSet(racks);
+      } else {
+        this.racks = null;
+      }
+    }
+
+    public Set<String> getDataLocalHosts() {
+      return hosts;
+    }
+
+    public Set<String> getRacks() {
+      return racks;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 9397;
+      int result = 1;
+      result = ( hosts != null) ?
+          prime * result + hosts.hashCode() :
+          result + prime;
+      result = ( racks != null) ?
+          prime * result + racks.hashCode() :
+          result + prime;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      TaskLocationHint other = (TaskLocationHint) obj;
+      if (hosts != null) {
+        if (!hosts.equals(other.hosts)) {
+          return false;
+        }
+      } else if (other.hosts != null) {
+        return false;
+      }
+      if (racks != null) {
+        if (!racks.equals(other.racks)) {
+          return false;
+        }
+      } else if (other.racks != null) {
+        return false;
+      }
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
new file mode 100644
index 0000000..9062e8e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java
@@ -0,0 +1,67 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.tez.dag.api.TezException;
+
+/*
+ * Interface class for monitoring the <code>DAG</code> running in a Tez DAG
+ * Application Master.
+ */
+public interface DAGClient extends Closeable {
+
+  /**
+   * Get the YARN ApplicationId for the app running the DAG
+   * @return <code>ApplicationId</code>
+   */
+  public ApplicationId getApplicationId();
+
+  @Private
+  /**
+   * Get the YARN ApplicationReport for the app running the DAG. For performance
+   * reasons this may be stale copy and should be used to access static info. It
+   * may be null.
+   * @return <code>ApplicationReport</code> or null
+   */
+  public ApplicationReport getApplicationReport();
+
+  /**
+   * Get the status of the specified DAG
+   */
+  public DAGStatus getDAGStatus() throws IOException, TezException;
+
+  /**
+   * Get the status of a Vertex of a DAG
+   */
+  public VertexStatus getVertexStatus(String vertexName)
+      throws IOException, TezException;
+
+  /**
+   * Kill a running DAG
+   *
+   */
+  public void tryKillDAG() throws TezException, IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
new file mode 100644
index 0000000..d61173d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -0,0 +1,130 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProtoOrBuilder;
+import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class DAGStatus {
+
+  public enum State {
+    SUBMITTED,
+    INITING,
+    RUNNING,
+    SUCCEEDED,
+    KILLED,
+    FAILED,
+    ERROR,
+  };
+
+  DAGStatusProtoOrBuilder proxy = null;
+  Progress progress = null;
+  Map<String, Progress> vertexProgress = null;
+
+  public DAGStatus(DAGStatusProtoOrBuilder proxy) {
+    this.proxy = proxy;
+  }
+
+  public State getState() {
+    switch(proxy.getState()) {
+    case DAG_SUBMITTED:
+      return DAGStatus.State.SUBMITTED;
+    // For simplicity, initing/terminating states are presented as running
+    case DAG_INITING:
+    case DAG_TERMINATING:
+    case DAG_RUNNING:
+      return DAGStatus.State.RUNNING;
+    case DAG_SUCCEEDED:
+      return DAGStatus.State.SUCCEEDED;
+    case DAG_FAILED:
+      return DAGStatus.State.FAILED;
+    case DAG_KILLED:
+      return DAGStatus.State.KILLED;
+    case DAG_ERROR:
+      return DAGStatus.State.ERROR;
+    default:
+      throw new TezUncheckedException("Unsupported value for DAGStatus.State : " +
+                              proxy.getState());
+    }
+  }
+
+  public boolean isCompleted() {
+    State state = getState();
+    return (state == State.SUCCEEDED ||
+             state == State.FAILED ||
+             state == State.KILLED ||
+             state == State.ERROR);
+  }
+
+  public List<String> getDiagnostics() {
+    return proxy.getDiagnosticsList();
+  }
+
+  /**
+   * Gets overall progress value of the DAG.
+   *
+   * @return Progress of the DAG. Maybe null when the DAG is not running. Maybe
+   *         null when the DAG is running and the application master cannot be
+   *         reached - e.g. when the execution platform has restarted the
+   *         application master.
+   * @see Progress
+   */
+  public Progress getDAGProgress() {
+    if(progress == null && proxy.hasDAGProgress()) {
+      progress = new Progress(proxy.getDAGProgress());
+    }
+    return progress;
+  }
+
+  /**
+   * Get the progress of a vertex in the DAG
+   *
+   * @return Progress of the vertex. May be null when the DAG is not running.
+   *         Maybe null when the DAG is running and the application master
+   *         cannot be reached - e.g. when the execution platform has restarted
+   *         the application master.
+   * @see Progress
+   */
+  public Map<String, Progress> getVertexProgress() {
+    if(vertexProgress == null) {
+      if(proxy.getVertexProgressList() != null) {
+        List<StringProgressPairProto> kvList = proxy.getVertexProgressList();
+        vertexProgress = new HashMap<String, Progress>(kvList.size());
+        for(StringProgressPairProto kv : kvList){
+          vertexProgress.put(kv.getKey(), new Progress(kv.getProgress()));
+        }
+      }
+    }
+    return vertexProgress;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("status=" + getState()
+        + ", progress=" + getDAGProgress());
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
new file mode 100644
index 0000000..9577320
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
@@ -0,0 +1,67 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import org.apache.tez.dag.api.records.DAGProtos.ProgressProtoOrBuilder;
+
+public class Progress {
+  
+  ProgressProtoOrBuilder proxy = null;
+  
+  Progress(ProgressProtoOrBuilder proxy) {
+    this.proxy = proxy;
+  }
+  
+  public int getTotalTaskCount() {
+    return proxy.getTotalTaskCount();
+  }
+
+  public int getSucceededTaskCount() {
+    return proxy.getSucceededTaskCount();
+  }
+
+  public int getRunningTaskCount() {
+    return proxy.getRunningTaskCount();
+  }
+
+  public int getFailedTaskCount() {
+    return proxy.getFailedTaskCount();
+  }
+
+  public int getKilledTaskCount() {
+    return proxy.getKilledTaskCount();
+  }
+  
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("TotalTasks: ");
+    sb.append(getTotalTaskCount());
+    sb.append(" Succeeded: ");
+    sb.append(getSucceededTaskCount());
+    sb.append(" Running: ");
+    sb.append(getRunningTaskCount());
+    sb.append(" Failed: "); 
+    sb.append(getFailedTaskCount());
+    sb.append(" Killed: "); 
+    sb.append(getKilledTaskCount());
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
new file mode 100644
index 0000000..ce5dbe0
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java
@@ -0,0 +1,78 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.api.client;
+
+import java.util.List;
+
+import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProtoOrBuilder;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+public class VertexStatus {
+  
+  public enum State {
+    INITED,
+    RUNNING,
+    SUCCEEDED,
+    KILLED,
+    FAILED,
+    ERROR,
+    TERMINATING,
+  };
+  
+  VertexStatusProtoOrBuilder proxy = null;
+  Progress progress = null;
+  
+  public VertexStatus(VertexStatusProtoOrBuilder proxy) {
+    this.proxy = proxy;
+  }
+
+  public State getState() {
+    switch(proxy.getState()) {
+    case VERTEX_INITED:
+      return VertexStatus.State.INITED;
+    case VERTEX_RUNNING:
+      return VertexStatus.State.RUNNING;
+    case VERTEX_SUCCEEDED:
+      return VertexStatus.State.SUCCEEDED;
+    case VERTEX_FAILED:
+      return VertexStatus.State.FAILED;
+    case VERTEX_KILLED:
+      return VertexStatus.State.KILLED;
+    case VERTEX_ERROR:
+      return VertexStatus.State.ERROR;
+    case VERTEX_TERMINATING:
+      return VertexStatus.State.TERMINATING;
+    default:
+      throw new TezUncheckedException("Unsupported value for VertexStatus.State : " + 
+                              proxy.getState());
+    }    
+  }
+
+  public List<String> getDiagnostics() {
+    return proxy.getDiagnosticsList();
+  }
+
+  public Progress getProgress() {
+    if(progress == null && proxy.hasProgress()) {
+      progress = new Progress(proxy.getProgress());
+    }
+    return progress;    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java
new file mode 100644
index 0000000..a1ee18f
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client.rpc;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.DAGClientAMProtocol;
+
+@ProtocolInfo(
+    protocolName = "org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolPB",
+    protocolVersion = 1)
+public interface DAGClientAMProtocolBlockingPB 
+                              extends DAGClientAMProtocol.BlockingInterface {
+
+}