You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ml...@apache.org on 2013/05/13 20:21:44 UTC

[1/2] TEZ-25: Change DAG representation from Configuration object to structured protobuf message

Updated Branches:
  refs/heads/TEZ-1 97e1fa967 -> cb5758b42


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java b/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
deleted file mode 100644
index eb8c781..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestVertexLocationHint {
-
-  private DataInput in;
-  private DataOutput out;
-  private ByteArrayOutputStream bOut;
-
-  @Before
-  public void setup() {
-    bOut = new ByteArrayOutputStream();
-    out = new DataOutputStream(bOut);
-  }
-
-  @After
-  public void teardown() {
-    in = null;
-    out = null;
-    bOut = null;
-  }
-
-  @Test
-  public void testNullTaskLocationHintSerDes() throws IOException {
-    TaskLocationHint expected = new TaskLocationHint(null, null);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    TaskLocationHint actual = new TaskLocationHint();
-    actual.readFields(in);
-    Assert.assertNull(actual.getDataLocalHosts());
-    Assert.assertNull(actual.getRacks());
-  }
-
-  @Test
-  public void testTaskLocationHintSerDes() throws IOException {
-    String[] hosts = { "h1", "h2", "", null };
-    String[] racks = { "r1", "r2" };
-    TaskLocationHint expected = new TaskLocationHint(hosts, racks);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    TaskLocationHint actual = new TaskLocationHint();
-    actual.readFields(in);
-    Assert.assertNotNull(actual.getDataLocalHosts());
-    Assert.assertNotNull(actual.getRacks());
-    Assert.assertArrayEquals(hosts, actual.getDataLocalHosts());
-    Assert.assertArrayEquals(racks, actual.getRacks());
-  }
-
-  @Test
-  public void testTaskLocationHintSerDes2() throws IOException {
-    String[] hosts = null;
-    String[] racks = { "r1", "r2" };
-    TaskLocationHint expected = new TaskLocationHint(hosts, racks);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    TaskLocationHint actual = new TaskLocationHint();
-    actual.readFields(in);
-    Assert.assertNull(actual.getDataLocalHosts());
-    Assert.assertNotNull(actual.getRacks());
-    Assert.assertArrayEquals(racks, actual.getRacks());
-  }
-
-  @Test
-  public void testEmptyVertexLocationHintSerDes() throws IOException {
-    VertexLocationHint expected = new VertexLocationHint(0);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    VertexLocationHint actual = new VertexLocationHint();
-    actual.readFields(in);
-    Assert.assertEquals(0, actual.getNumTasks());
-    Assert.assertNotNull(actual.getTaskLocationHints());
-    Assert.assertEquals(0, actual.getTaskLocationHints().length);
-  }
-
-  @Test
-  public void testVertexLocationHintSerDes() throws IOException {
-    String[] hosts = { "h1", "h2", "", null };
-    String[] racks = { "r1", "r2" };
-    VertexLocationHint expected = new VertexLocationHint(4);
-    expected.getTaskLocationHints()[0] = new TaskLocationHint(hosts, racks);
-    expected.getTaskLocationHints()[1] = null;
-    expected.getTaskLocationHints()[2] = new TaskLocationHint(null, racks);
-    expected.getTaskLocationHints()[3] = new TaskLocationHint(hosts, null);
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    VertexLocationHint actual = new VertexLocationHint();
-    actual.readFields(in);
-
-    Assert.assertEquals(4, actual.getNumTasks());
-    Assert.assertNotNull(actual.getTaskLocationHints());
-    Assert.assertEquals(4, actual.getTaskLocationHints().length);
-
-    Assert.assertNotNull(actual.getTaskLocationHints()[0]);
-    Assert.assertNull(actual.getTaskLocationHints()[1]);
-    Assert.assertNotNull(actual.getTaskLocationHints()[2]);
-    Assert.assertNotNull(actual.getTaskLocationHints()[3]);
-
-    Assert.assertArrayEquals(
-        expected.getTaskLocationHints()[0].getDataLocalHosts(),
-        actual.getTaskLocationHints()[0].getDataLocalHosts());
-    Assert.assertArrayEquals(
-        expected.getTaskLocationHints()[0].getRacks(),
-        actual.getTaskLocationHints()[0].getRacks());
-    Assert.assertNull(
-        actual.getTaskLocationHints()[2].getDataLocalHosts());
-    Assert.assertArrayEquals(
-        expected.getTaskLocationHints()[2].getRacks(),
-        actual.getTaskLocationHints()[2].getRacks());
-    Assert.assertArrayEquals(
-        expected.getTaskLocationHints()[3].getDataLocalHosts(),
-        actual.getTaskLocationHints()[3].getDataLocalHosts());
-    Assert.assertNull(
-        actual.getTaskLocationHints()[3].getRacks());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 715a364..9042846 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -99,7 +99,7 @@ import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -737,6 +737,14 @@ public class YARNRunner implements ClientProtocol {
     List<String> vargs = new ArrayList<String>(8);
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
+//[Debug AppMaster] Current simplest way to attach debugger to AppMaster
+// Uncomment the following, then launch a regular job, eg 
+// >hadoop jar {path}\hadoop-mapreduce-examples-3.0.0-SNAPSHOT.jar pi 2 2                  
+//     LOG.error(" !!!!!!!!!");
+//     LOG.error(" !!!!!!!!! Launching AppMaster in debug/suspend mode.  Attach to port 8002");
+//     LOG.error(" !!!!!!!!!");
+//     vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=y");
+    
     // FIXME set up logging related properties
     // TODO -Dtez.root.logger??
     // MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
@@ -795,24 +803,38 @@ public class YARNRunner implements ClientProtocol {
 
     setDAGParamsFromMRConf(dag);
 
-    // FIXME add serialized dag conf
-    DAGConfiguration dagConf = dag.serializeDag();
-    
-    Path dagConfFilePath = new Path(jobSubmitDir,
-        TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
-    FSDataOutputStream dagConfOut =
-        FileSystem.create(fs, dagConfFilePath,
-            new FsPermission(DAG_FILE_PERMISSION));
+    // emit protobuf DAG file style
+    JobPlan dagPB = dag.createDag();
+    FSDataOutputStream dagPBOutBinaryStream = null;
+    FSDataOutputStream dagPBOutTextStream = null;
+    Path binaryPath =  new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_BINARY);
+    Path textPath =  new Path(jobSubmitDir, TezConfiguration.DAG_AM_PLAN_PB_TEXT);
     try {
-      dagConf.writeXml(dagConfOut);
+      //binary output
+      dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
+          new FsPermission(DAG_FILE_PERMISSION));
+      dagPB.writeTo(dagPBOutBinaryStream);
+
+      // text / human-readable output
+      dagPBOutTextStream = FileSystem.create(fs, textPath,
+          new FsPermission(DAG_FILE_PERMISSION));
+      dagPBOutTextStream.writeUTF(dagPB.toString());
     } finally {
-      dagConfOut.close();
+      if(dagPBOutBinaryStream != null){
+        dagPBOutBinaryStream.close();
+      }
+      if(dagPBOutTextStream != null){
+        dagPBOutTextStream.close();
+      }
     }
-    localResources.put(TezConfiguration.DAG_AM_PLAN_CONFIG_XML,
+
+    localResources.put(TezConfiguration.DAG_AM_PLAN_PB_BINARY,
         createApplicationResource(defaultFileContext,
-            dagConfFilePath, LocalResourceType.FILE));
+            binaryPath, LocalResourceType.FILE));
 
-    // FIXME add tez conf if needed
+    localResources.put(TezConfiguration.DAG_AM_PLAN_PB_TEXT,
+        createApplicationResource(defaultFileContext,
+            textPath, LocalResourceType.FILE));
 
     // FIXME are we using MR acls for tez?
     Map<ApplicationAccessType, String> acls


[2/2] git commit: TEZ-25: Change DAG representation from Configuration object to structured protobuf message

Posted by ml...@apache.org.
TEZ-25: Change DAG representation from Configuration object to structured protobuf message


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/cb5758b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/cb5758b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/cb5758b4

Branch: refs/heads/TEZ-1
Commit: cb5758b424a80c5a17e0d9983a7b6ff2982dcd31
Parents: 97e1fa9
Author: mikelid <mi...@microsoft.com>
Authored: Mon May 13 11:21:27 2013 -0700
Committer: mikelid <mi...@microsoft.com>
Committed: Mon May 13 11:21:27 2013 -0700

----------------------------------------------------------------------
 pom.xml                                            |    5 +
 tez-dag-api/pom.xml                                |   26 +
 .../src/main/java/org/apache/tez/dag/api/DAG.java  |  128 ++++-
 .../org/apache/tez/dag/api/DAGConfiguration.java   |  492 ---------------
 .../org/apache/tez/dag/api/DAGLocationHint.java    |   95 ---
 .../org/apache/tez/dag/api/DagTypeConverters.java  |  221 +++++++
 .../org/apache/tez/dag/api/TezConfiguration.java   |    3 +-
 .../org/apache/tez/dag/api/VertexLocationHint.java |   83 +---
 tez-dag-api/src/main/proto/DAGPlan.proto           |  119 ++++
 .../java/org/apache/tez/dag/api/TestDAGPlan.java   |   82 +++
 .../java/org/apache/tez/dag/app/DAGAppMaster.java  |   82 ++--
 .../org/apache/tez/dag/app/MRRExampleHelper.java   |   33 +-
 .../main/java/org/apache/tez/dag/app/dag/DAG.java  |    7 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java    |    3 +
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   |  101 ++--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |   33 +-
 .../apache/tez/dag/utils/TezEngineChildJVM.java    |    6 +
 .../apache/tez/dag/api/TestDAGLocationHint.java    |  114 ----
 .../apache/tez/dag/api/TestVertexLocationHint.java |  147 -----
 .../java/org/apache/tez/mapreduce/YARNRunner.java  |   50 +-
 20 files changed, 736 insertions(+), 1094 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 586eb18..5c545c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,11 @@
         <artifactId>avro</artifactId>
         <version>1.5.3</version>
       </dependency>
+      <dependency>
+       <groupId>com.google.protobuf</groupId>
+       <artifactId>protobuf-java</artifactId>
+       <version>2.4.0a</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag-api/pom.xml b/tez-dag-api/pom.xml
index c084f01..2b6aebb 100644
--- a/tez-dag-api/pom.xml
+++ b/tez-dag-api/pom.xml
@@ -46,6 +46,32 @@
         <configuration>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <version>3.0.0-SNAPSHOT</version>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>DAGPlan.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 185f9a4..c18c720 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -18,8 +18,24 @@
 package org.apache.tez.dag.api;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.*;
+import org.apache.tez.dag.api.DAGPlan.PlanKeyValuePair.Builder;
+import org.apache.tez.dag.api.DAGPlan.PlanTaskLocationHint;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.DAGPlan.*;
+
 
 public class DAG { // FIXME rename to Topology
   List<Vertex> vertices;
@@ -76,28 +92,102 @@ public class DAG { // FIXME rename to Topology
     //for each vertex if not specified?
   }
     
-  // FIXME DAGConfiguration is not public API
-  public DAGConfiguration serializeDag() {
-    DAGConfiguration dagConf = new DAGConfiguration();
-    
-    dagConf.setName(name);
-    dagConf.setVertices(vertices);
-    dagConf.setEdgeProperties(edges);
-    
-    for(Vertex vertex : vertices) {
-      if(vertex.getInputVertices() != null) {
-        dagConf.setInputVertices(vertex.getVertexName(), vertex.getInputVertices());
-        dagConf.setInputEdgeIds(vertex.getVertexName(), vertex.getInputEdgeIds());
+  // create protobuf message describing DAG
+  public JobPlan createDag(){
+    JobPlan.Builder jobBuilder = JobPlan.newBuilder();
+
+    jobBuilder.setName(this.name);
+
+    for(Vertex vertex : vertices){
+      VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
+      vertexBuilder.setName(vertex.getVertexName());
+      vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
+      vertexBuilder.setProcessorName(vertex.getProcessorName());
+
+      //task config
+      PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
+      Resource resource = vertex.getTaskResource();
+      taskConfigBuilder.setNumTasks(vertex.getParallelism());
+      taskConfigBuilder.setMemoryMb(resource.getMemory());
+      taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
+      taskConfigBuilder.setJavaOpts(vertex.getJavaOpts());
+
+      taskConfigBuilder.setTaskModule(vertex.getVertexName());
+      PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
+      Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
+      for(String key : lrs.keySet()){
+        LocalResource lr = lrs.get(key);
+        localResourcesBuilder.setName(key);
+        localResourcesBuilder.setUri(DagTypeConverters.convertToDAGPlan(lr.getResource()));
+        localResourcesBuilder.setSize(lr.getSize());
+        localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+        localResourcesBuilder.setType(DagTypeConverters.convertToDAGPlan(lr.getType()));
+        localResourcesBuilder.setVisibility(DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+        if(lr.getType() == LocalResourceType.PATTERN){
+          assert lr.getPattern() != null : "resourceType=PATTERN but pattern is null";
+          localResourcesBuilder.setPattern(lr.getPattern());
+        }
+        taskConfigBuilder.addLocalResource(localResourcesBuilder);
       }
-      if(vertex.getOutputVertices() != null) {
-        dagConf.setOutputVertices(vertex.getVertexName(), vertex.getOutputVertices());
-        dagConf.setOutputEdgeIds(vertex.getVertexName(), vertex.getOutputEdgeIds());
+
+      if(vertex.getTaskEnvironment() != null){
+        for(String key : vertex.getTaskEnvironment().keySet()){
+          PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
+          envSettingBuilder.setKey(key);
+          envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
+          taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
+        }
       }
+
+      if(vertex.getTaskLocationsHint() != null ){
+        if(vertex.getTaskLocationsHint().getTaskLocationHints() != null){
+          for(TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()){
+            PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
+
+            if(hint.getDataLocalHosts() != null){
+              taskLocationHintBuilder.addAllHost(Arrays.asList(hint.getDataLocalHosts()));
+            }
+            if(hint.getRacks() != null){
+              taskLocationHintBuilder.addAllRack(Arrays.asList(hint.getRacks()));
+            }
+
+            vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
+          }
+        }
+      }
+
+      for(String inEdgeId : vertex.getInputEdgeIds()){
+        vertexBuilder.addInEdgeId(inEdgeId);
+      }
+      
+      for(String outEdgeId : vertex.getOutputEdgeIds()){
+        vertexBuilder.addOutEdgeId(outEdgeId);
+      }
+      
+      vertexBuilder.setTaskConfig(taskConfigBuilder);
+      jobBuilder.addVertex(vertexBuilder);
+    }
+
+    for(Edge edge : edges){
+      EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
+      edgeBuilder.setId(edge.getId());
+      edgeBuilder.setInputVertexName(edge.getInputVertex().getVertexName());
+      edgeBuilder.setOutputVertexName(edge.getOutputVertex().getVertexName());
+      edgeBuilder.setConnectionPattern(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().connectionPattern));
+      edgeBuilder.setSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSourceType()));
+      edgeBuilder.setInputClass(edge.getEdgeProperty().inputClass);
+      edgeBuilder.setOutputClass(edge.getEdgeProperty().outputClass);
+
+      jobBuilder.addEdge(edgeBuilder);
     }
     
-    dagConf.setConfig(config);
-    
-    return dagConf;
+    for(Entry<String, String> entry : this.config.entrySet()){
+      PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+      kvp.setKey(entry.getKey());
+      kvp.setValue(entry.getValue());
+      jobBuilder.addJobSetting(kvp);
+    }
+
+    return jobBuilder.build();
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
deleted file mode 100644
index 93ed118..0000000
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
+++ /dev/null
@@ -1,492 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.TreeMap;
-import java.util.Map.Entry;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
-import org.apache.hadoop.classification.InterfaceStability.Stable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-// FIXME dag conf should not be public API???
-public class DAGConfiguration extends Configuration {
-
-  private static final Log LOG = LogFactory.getLog(DAGConfiguration.class);
-
-  public DAGConfiguration(Configuration conf) {
-    super(conf);
-    if (! (conf instanceof DAGConfiguration)) {
-      this.reloadConfiguration();
-    }
-  }
-
-  public DAGConfiguration() {
-    super();
-  }
-
-  public final static String DAG = "tez.dag.";
-  
-  public final static String DAG_AM = DAG + "am.";
-
-  public final static String VERTEX = DAG + "vertex.";
-
-  public final static String TASK = DAG + "task.";
-
-  public final static String TASK_ATTEMPT = DAG + "attempt.";
-
-  public final static String TEZ_DAG_VERTICES = DAG + "vertices";
-
-  public final static String TEZ_DAG_EDGES = DAG + "edges";
-
-  public final static String EDGE = DAG + "edge.";
-
-  private final static String SEPARATOR = "|";
-  
-  public final static String TEZ_DAG_CONFIG_KEYS = DAG + "keys";
-  public final static String TEZ_DAG_CONFIG_VALUES = DAG + "values";
-
-  @Private
-  public void setConfig(Map<String, String> config) {
-    if(!config.isEmpty()) {
-      String[] key = new String[config.size()];
-      String[] value = new String[config.size()];
-      int i=0;
-      for(Entry<String, String> entry : config.entrySet()) {
-        key[i] = entry.getKey();
-        value[i] = entry.getValue();
-        i++;
-      }
-      setStrings(TEZ_DAG_CONFIG_KEYS, key);
-      setStrings(TEZ_DAG_CONFIG_VALUES, value);
-    }
-  }
-  
-  @Private
-  public Map<String, String> getConfig() {
-    HashMap<String, String> config = new HashMap<String, String>();
-    String[] key = getStrings(TEZ_DAG_CONFIG_KEYS);
-    if(key != null) {
-      String[] value = getStrings(TEZ_DAG_CONFIG_VALUES);
-      assert value.length == key.length;
-      for(int i=0; i<key.length; ++i) {
-        config.put(key[i], value[i]);
-      }
-    }
-    return config;
-  }
-  
-  @Private
-  public void setEdgeProperties(List<Edge> edges) {
-    String[] edgeIds = new String[edges.size()];
-    for(int i=0; i<edges.size(); ++i) {
-      edgeIds[i] = edges.get(i).getId();
-    }
-    setStrings(TEZ_DAG_EDGES, edgeIds);
-    for (Edge edge : edges) {
-      setEdgeProperty(edge.getEdgeProperty(), edge.getId());
-    }
-  }
-
-  public Map<String, EdgeProperty> getEdgeProperties() {
-    String[] edgeIds = getStrings(TEZ_DAG_EDGES);
-    if (edgeIds == null) {
-      return new TreeMap<String, EdgeProperty>();
-    }
-    Map<String, EdgeProperty> edgeProperties =
-                          new HashMap<String, EdgeProperty>(edgeIds.length);
-    for(int i=0; i<edgeIds.length; ++i) {
-      edgeProperties.put(edgeIds[i], getEdgeProperty(edgeIds[i]));
-    }
-    return edgeProperties;
-  }
-
-  private void setEdgeProperty(EdgeProperty edgeProperty, String edgeId) {
-    String[] edgeStrs = new String[4];
-    edgeStrs[0] = edgeProperty.getConnectionPattern().name();
-    edgeStrs[1] = edgeProperty.getSourceType().name();
-    edgeStrs[2] = edgeProperty.inputClass;
-    edgeStrs[3] = edgeProperty.outputClass;
-
-    setStrings(EDGE + edgeId, edgeStrs);
-  }
-
-  private EdgeProperty getEdgeProperty(String edgeId) {
-    String[] edgeStr = getStrings(EDGE + edgeId);
-    assert edgeStr.length == 4;
-    return new EdgeProperty(EdgeProperty.ConnectionPattern.valueOf(edgeStr[0]),
-                             EdgeProperty.SourceType.valueOf(edgeStr[1]),
-                             edgeStr[2],
-                             edgeStr[3]);
-  }
-
-  public final static String TEZ_DAG_VERTEX_TASKS = VERTEX + "num-tasks";
-  public final static int DEFAULT_TEZ_DAG_VERTEX_TASKS  = 0;
-
-  public int getNumVertexTasks(String vertexName) {
-    return getInt(
-        TEZ_DAG_VERTEX_TASKS + "." + vertexName,
-        DEFAULT_TEZ_DAG_VERTEX_TASKS);
-  }
-
-  public void setNumVertexTasks(String vertexName, int numTasks) {
-    setInt(
-        TEZ_DAG_VERTEX_TASKS + "." + vertexName,
-        numTasks);
-  }
-
-  public final String TEZ_DAG_VERTEX_TASK_MEMORY = TASK + "memory-mb";
-
-  public final int DEFAULT_TEZ_DAG_VERTEX_TASK_MEMORY = 1024;
-
-  public int getVertexTaskMemory(String vertexName) {
-    return getInt(
-        TEZ_DAG_VERTEX_TASK_MEMORY + "." + vertexName,
-        DEFAULT_TEZ_DAG_VERTEX_TASK_MEMORY);
-  }
-
-  public void setVertexTaskMemory(String vertexName, int memory) {
-    setInt(
-        TEZ_DAG_VERTEX_TASK_MEMORY + "." + vertexName,
-        memory);
-  }
-
-  public final String TEZ_DAG_VERTEX_TASK_CPU = TASK + "cpu-vcores";
-
-  public final int DEFAULT_TEZ_DAG_VERTEX_TASK_CORES = 1;
-
-  public int getVertexTaskCores(String vertexName) {
-    return getInt(
-        TEZ_DAG_VERTEX_TASK_CPU + "." + vertexName,
-        DEFAULT_TEZ_DAG_VERTEX_TASK_CORES);
-  }
-  public void setVertexTaskCores(String vertexName, int cores) {
-    setInt(
-        TEZ_DAG_VERTEX_TASK_CPU + "." + vertexName,
-        cores);
-  }
-
-  private final String[] EMPTY = new String[0];
-
-  public String[] getVertices() {
-    String[] vertices = getStrings(TEZ_DAG_VERTICES, EMPTY);
-    return vertices == null? EMPTY : vertices;
-  }
-
-  void setVertexResource(Vertex vertex) {
-    Resource resource = vertex.getTaskResource();
-    if (resource == null) {
-      return;
-    }
-    setVertexTaskCores(vertex.getVertexName(), resource.getVirtualCores());
-    setVertexTaskMemory(vertex.getVertexName(), resource.getMemory());
-  }
-
-  public Resource getVertexResource(String vertexName) {
-    int memory = getVertexTaskMemory(vertexName);
-    int vCores = getVertexTaskCores(vertexName);
-    return BuilderUtils.newResource(memory, vCores);
-  }
-
-  // FIXME we are serializing YarnURL which is not same as serializing a URL
-  public final String TEZ_DAG_VERTEX_TASK_LOCAL_RESOURCE = TASK + "local-resource.";
-  void setVertexLocalResource(Vertex vertex) {
-    Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
-    if (lrs == null) {
-      return;
-    }
-    String[] lrStrs = new String[lrs.size()];
-    int i=0;
-    for(Map.Entry<String,LocalResource> entry : lrs.entrySet()) {
-      LocalResource lr = entry.getValue();
-      try {
-        String lrStr = StringUtils.escapeString(entry.getKey(),
-            StringUtils.ESCAPE_CHAR,
-            SEPARATOR.charAt(0))
-            + SEPARATOR
-            + StringUtils.escapeString(
-                ConverterUtils.getPathFromYarnURL(lr.getResource()).toString(),
-                StringUtils.ESCAPE_CHAR,
-                SEPARATOR.charAt(0))
-            + SEPARATOR
-            + StringUtils.escapeString(String.valueOf(lr.getSize()),
-                StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0))
-            + SEPARATOR
-            + StringUtils.escapeString(String.valueOf(lr.getTimestamp()),
-                StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0))
-            + SEPARATOR
-            + StringUtils.escapeString(lr.getType().name(),
-                StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0))
-            + SEPARATOR
-            + StringUtils.escapeString(lr.getVisibility().name(),
-                StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0))
-            + SEPARATOR
-            + StringUtils.escapeString(
-                (lr.getPattern() == null ? "" : lr.getPattern()),
-                StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0));
-        lrStrs[i++] = StringUtils.escapeString(lrStr);
-      } catch (URISyntaxException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    setStrings(TEZ_DAG_VERTEX_TASK_LOCAL_RESOURCE + vertex.getVertexName(),
-        lrStrs);
-  }
-
-  public Map<String, LocalResource> getVertexLocalResources(String vertexName) {
-    String[] lrStrs = StringUtils.split(get(
-        TEZ_DAG_VERTEX_TASK_LOCAL_RESOURCE + vertexName, ""));
-    Map<String, LocalResource> localResources =
-        new TreeMap<String, LocalResource>();
-    if (lrStrs == null) {
-      return localResources;
-    }
-    LOG.info("XXXX Found " + lrStrs.length + " local resources");
-    for (String lrStr : lrStrs) {
-      String[] tokens =
-          StringUtils.split(
-              lrStr, StringUtils.ESCAPE_CHAR, SEPARATOR.charAt(0));
-      if (tokens.length != 6
-          && tokens.length != 7) {
-        LOG.warn("Invalid token count in serialized LocalResource"
-            + ", serializedString=" + lrStr
-            + ", tokenCount=" + tokens.length);
-      }
-      String resourceName = tokens[0];
-      LocalResource lRsrc = Records.newRecord(LocalResource.class);
-      lRsrc.setResource(ConverterUtils.getYarnUrlFromPath(
-          new Path(tokens[1])));
-      lRsrc.setSize(Long.valueOf(tokens[2]));
-      lRsrc.setTimestamp(Long.valueOf(tokens[3]));
-      lRsrc.setType(LocalResourceType.valueOf(tokens[4]));
-      lRsrc.setVisibility(LocalResourceVisibility.valueOf(
-          tokens[5]));
-      if (tokens.length == 7) {
-        lRsrc.setPattern(tokens[6]);
-      }
-      try {
-        LOG.info("XXXX Adding local resource"
-            + ", vertexName=" + vertexName
-            + ", resourceName=" + resourceName
-            + ", resourceUrl"
-            + ConverterUtils.getPathFromYarnURL(
-                lRsrc.getResource()).toString());
-      } catch (URISyntaxException e) {
-        // Ignore
-        // FIXME
-      }
-      localResources.put(resourceName, lRsrc);
-    }
-    return localResources;
-  }
-
-  public final String TEZ_DAG_VERTEX_TASK_ENV = TASK + "env.";
-  void setVertexEnv(Vertex vertex) {
-    Map<String, String> env = vertex.getTaskEnvironment();
-    if (env == null) {
-      return;
-    }
-    String[] envStrs = new String[env.size()];
-    int i=0;
-    for(Map.Entry<String,String> entry : env.entrySet()) {
-      String envStr = entry.getKey() + SEPARATOR + entry.getValue();
-      envStrs[i++] = StringUtils.escapeString(envStr);
-    }
-    set(TEZ_DAG_VERTEX_TASK_ENV + vertex.getVertexName(),
-        StringUtils.join(",", envStrs));
-  }
-
-  public Map<String,String> getVertexEnv(String vertexName) {
-    String[] envStrs = StringUtils.split(
-        get(TEZ_DAG_VERTEX_TASK_ENV + vertexName, ""));
-    Map<String,String> env = new HashMap<String,String>();
-    if(envStrs == null) {
-      return env;
-    }
-
-    LOG.info("XXXX Found " + envStrs.length + " environment");
-    for(String envStr : envStrs ) {
-      LOG.info("XXXX Parsing env from " + envStr);
-      StringTokenizer tokenizer = new StringTokenizer (envStr, SEPARATOR);
-      String envName = tokenizer.nextToken();
-      String envValue = tokenizer.nextToken();
-      env.put(StringUtils.unEscapeString(envName),
-          StringUtils.unEscapeString(envValue));
-    }
-
-    return env;
-  }
-  
-  public final String TEZ_DAG_NAME = DAG + "name";
-  @Private
-  public void setName(String name) {
-    set(TEZ_DAG_NAME, name);
-  }
-  
-  @Public
-  @Stable
-  public String getName() {
-    String name = get(TEZ_DAG_NAME);
-    return name;
-  }
-  
-  @Private
-  public void setVertices(List<Vertex> vertices) {
-    setVertices(TEZ_DAG_VERTICES, vertices);
-    for(Vertex vertex : vertices) {
-      // set num tasks
-      setNumVertexTasks(vertex.getVertexName(), vertex.getParallelism());
-      // set resource
-      setVertexResource(vertex);
-      // set localResource
-      setVertexLocalResource(vertex);
-      // set environment
-      setVertexEnv(vertex);
-      // set processor name
-      setVertexTaskModuleClassName(vertex);
-      //set javaOpts
-      setVertexJavaOpts(vertex.getVertexName(), vertex.getJavaOpts());
-    }
-  }
-
-  public final String TEZ_DAG_VERTEX_INPUT_VERTICES = VERTEX + "input-vertices";
-  @Public
-  @Stable
-  public String[] getInputVertices(String vertexName) {
-    String[] vertices =
-        getStrings(TEZ_DAG_VERTEX_INPUT_VERTICES + "." + vertexName, EMPTY);
-    return vertices == null ? EMPTY : vertices;
-  }
-  @Private
-  public void setInputVertices(String vertexName, List<Vertex> inputVertices) {
-    setVertices(TEZ_DAG_VERTEX_INPUT_VERTICES + "." + vertexName,
-        inputVertices);
-  }
-
-  private void setVertices(String key, List<Vertex> vertices) {
-    String[] verticesNames = new String[vertices.size()];
-    for (int i = 0; i < vertices.size(); ++i) {
-      verticesNames[i] = vertices.get(i).getVertexName();
-    }
-    setStrings(key, verticesNames);
-  }
-
-  public final String TEZ_DAG_VERTEX_OUTPUT_VERTICES = VERTEX
-      + "output-vertices";
-  @Public
-  @Stable
-  public String[] getOutputVertices(String vertexName) {
-    String[] vertices =
-        getStrings(TEZ_DAG_VERTEX_OUTPUT_VERTICES + "." + vertexName, EMPTY);
-    return vertices == null? EMPTY : vertices;
-  }
-  @Private
-  public void setOutputVertices(String vertexName,
-      List<Vertex> outputVertices) {
-    setVertices(TEZ_DAG_VERTEX_OUTPUT_VERTICES + "." + vertexName,
-        outputVertices);
-  }
-
-  public final String TEZ_DAG_VERTEX_INPUT_EDGES = VERTEX + "input-edges";
-  @Public
-  @Stable
-  public List<String> getInputEdgeIds(String vertexName) {
-    return getEdgeIds(TEZ_DAG_VERTEX_INPUT_EDGES + "." + vertexName);
-  }
-  public void setInputEdgeIds(String vertexName, List<String> edgeIds) {
-    setEdgeIds(TEZ_DAG_VERTEX_INPUT_EDGES + "." + vertexName, edgeIds);
-  }
-
-  public final String TEZ_DAG_VERTEX_OUTPUT_EDGES = VERTEX + "output-edges";
-  @Public
-  @Stable
-  public List<String> getOutputEdgeIds(String vertexName) {
-    return getEdgeIds(TEZ_DAG_VERTEX_OUTPUT_EDGES + "." + vertexName);
-  }
-  @Private
-  public void setOutputEdgeIds(String vertexName, List<String> edgeIds) {
-    setEdgeIds(TEZ_DAG_VERTEX_OUTPUT_EDGES + "." + vertexName, edgeIds);
-  }
-
-  private List<String> getEdgeIds(String key) {
-    String[] edgeIds = getStrings(key, EMPTY);
-    if (edgeIds == null) {
-      return new ArrayList<String>();
-    }
-    return Arrays.asList(edgeIds);
-  }
-
-  private void setEdgeIds(String key, List<String> edgeIds) {
-    setStrings(key, edgeIds.toArray(new String[]{}));
-  }
-
-  private void setVertexTaskModuleClassName(Vertex vertex) {
-    setVertexTaskModuleClassName(vertex.getVertexName(),
-        vertex.getProcessorName());
-  }
-
-  public final String TEZ_DAG_VERTEX_TASK_MODULE= VERTEX + "task-module";
-  @Private
-  public String getVertexTaskModuleClassName(String vertexName) {
-    return get(TEZ_DAG_VERTEX_TASK_MODULE + "." + vertexName);
-  }
-  @Private
-  public void setVertexTaskModuleClassName(String vertexName,
-      String taskModule) {
-    set(TEZ_DAG_VERTEX_TASK_MODULE + "." + vertexName, taskModule);
-  }
-  
-  public final String TEZ_DAG_VERTEX_JAVAOPTS= VERTEX + "java-opts";
-  @Private 
-  public String getVertexJavaOpts(String vertexName) {
-	  String opts = get(TEZ_DAG_VERTEX_JAVAOPTS + "." + vertexName);
-	  return opts == null? "" : opts;
-  }
-  
-  @Private
-  public void setVertexJavaOpts(String vertexName, String javaOpts){
-	  set(TEZ_DAG_VERTEX_JAVAOPTS + "." + vertexName, javaOpts);
-  }
-
-  /// File used for storing location hints that are passed to the DAG
-  public static final String DAG_LOCATION_HINT_RESOURCE_FILE =
-      DAG + "location-hint-resource-file";
-  public static final String DEFAULT_DAG_LOCATION_HINT_RESOURCE_FILE =
-      "tezdaglocationhint.info";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGLocationHint.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGLocationHint.java
deleted file mode 100644
index 9f48f2c..0000000
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGLocationHint.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class DAGLocationHint implements Writable {
-
-  private Map<String, VertexLocationHint> vertexLocationHints;
-
-  public DAGLocationHint() {
-    vertexLocationHints = new TreeMap<String, VertexLocationHint>();
-  }
-
-  public Map<String, VertexLocationHint> getVertexLocationHints() {
-    return vertexLocationHints;
-  }
-
-  public VertexLocationHint getVertexLocationHint(String vertexName) {
-    return vertexLocationHints.get(vertexName);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(vertexLocationHints.size());
-    for (Entry<String, VertexLocationHint> entry :
-        vertexLocationHints.entrySet()) {
-      Text.writeString(out, entry.getKey());
-      out.writeBoolean(entry.getValue() != null);
-      if (entry.getValue() != null) {
-        entry.getValue().write(out);
-      }
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int entryCount = in.readInt();
-    vertexLocationHints = new TreeMap<String, VertexLocationHint>();
-    for (int i = 0; i < entryCount; ++i) {
-      String vertexName = Text.readString(in);
-      if (!in.readBoolean()) {
-        vertexLocationHints.put(vertexName, null);
-      } else {
-        VertexLocationHint hint = new VertexLocationHint();
-        hint.readFields(in);
-        vertexLocationHints.put(vertexName, hint);
-      }
-    }
-  }
-
-  public static DAGLocationHint initDAGDagLocationHint(
-      String locationHintFile) throws IOException {
-    DataInput in = new DataInputStream(new FileInputStream(locationHintFile));
-    DAGLocationHint dagLocationHint = new DAGLocationHint();
-    dagLocationHint.readFields(in);
-    return dagLocationHint;
-  }
-
-  public static void writeDAGDagLocationHint(
-      DAGLocationHint dagLocationHint,
-      String locationHintFile) throws IOException {
-    DataOutput out = new DataOutputStream(new FileOutputStream(
-        locationHintFile));
-    dagLocationHint.write(out);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
new file mode 100644
index 0000000..04404ba
--- /dev/null
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.DAGPlan.EdgePlan;
+import org.apache.tez.dag.api.DAGPlan.PlanKeyValuePair;
+import org.apache.tez.dag.api.DAGPlan.PlanLocalResource;
+import org.apache.tez.dag.api.DAGPlan.PlanTaskConfiguration;
+import org.apache.tez.dag.api.DAGPlan.PlanTaskLocationHint;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.DAGPlan.*;
+
+public class DagTypeConverters {
+  
+  public static PlanLocalResourceVisibility convertToDAGPlan(LocalResourceVisibility visibility){
+    switch(visibility){
+      case PUBLIC : return PlanLocalResourceVisibility.PUBLIC;  
+      case PRIVATE : return PlanLocalResourceVisibility.PRIVATE;
+      case APPLICATION : return PlanLocalResourceVisibility.APPLICATION;
+      default : throw new RuntimeException("unknown 'visibility'");
+    }
+  }
+  
+  public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){
+    switch(visibility){
+      case PUBLIC : return LocalResourceVisibility.PUBLIC;  
+      case PRIVATE : return LocalResourceVisibility.PRIVATE;
+      case APPLICATION : return LocalResourceVisibility.APPLICATION;
+      default : throw new RuntimeException("unknown 'visibility'");
+    }
+  }
+  
+  public static PlanEdgeSourceType convertToDAGPlan(SourceType sourceType){
+    switch(sourceType){
+      case STABLE : return PlanEdgeSourceType.STABLE;  
+      case STABLE_PERSISTED : return PlanEdgeSourceType.STABLE_PERSISTED;
+      case STREAMING :  return PlanEdgeSourceType.STREAMING;
+      default : throw new RuntimeException("unknown 'sourceType'");
+    }
+  }
+  
+  public static SourceType convertFromDAGPlan(PlanEdgeSourceType sourceType){
+    switch(sourceType){
+      case STABLE : return SourceType.STABLE;  
+      case STABLE_PERSISTED : return SourceType.STABLE_PERSISTED;
+      case STREAMING :  return SourceType.STREAMING;
+      default : throw new RuntimeException("unknown 'sourceType'");
+    }
+  }
+  
+  public static PlanEdgeConnectionPattern convertToDAGPlan(ConnectionPattern pattern){
+    switch(pattern){
+      case ONE_TO_ONE : return PlanEdgeConnectionPattern.ONE_TO_ONE;  
+      case ONE_TO_ALL : return PlanEdgeConnectionPattern.ONE_TO_ALL;
+      case BIPARTITE : return PlanEdgeConnectionPattern.BIPARTITE;
+      default : throw new RuntimeException("unknown 'pattern'");
+    }
+  }
+  
+  public static ConnectionPattern convertFromDAGPlan(PlanEdgeConnectionPattern pattern){
+    switch(pattern){
+      case ONE_TO_ONE : return ConnectionPattern.ONE_TO_ONE;  
+      case ONE_TO_ALL : return ConnectionPattern.ONE_TO_ALL;
+      case BIPARTITE : return ConnectionPattern.BIPARTITE;
+      default : throw new IllegalArgumentException("unknown 'pattern'");
+    }
+  }
+  
+  public static PlanLocalResourceType convertToDAGPlan(LocalResourceType type) {
+    switch(type){
+    case ARCHIVE : return PlanLocalResourceType.ARCHIVE;
+    case FILE : return PlanLocalResourceType.FILE;
+    case PATTERN : return PlanLocalResourceType.PATTERN;
+    default : throw new IllegalArgumentException("unknown 'type'");
+    }
+  }
+  
+  public static LocalResourceType convertFromDAGPlan(PlanLocalResourceType type) {
+    switch(type){
+    case ARCHIVE : return LocalResourceType.ARCHIVE;
+    case FILE : return LocalResourceType.FILE;
+    case PATTERN : return LocalResourceType.PATTERN;
+    default : throw new IllegalArgumentException("unknown 'type'");
+    }
+  }
+
+  public static VertexLocationHint convertFromDAGPlan(
+      List<PlanTaskLocationHint> locationHints) {
+
+    List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();  
+    
+    for(PlanTaskLocationHint inputHint : locationHints){
+      TaskLocationHint outputHint = new TaskLocationHint(); 
+      outputHint.setRacks(inputHint.getRackList().toArray(new String[inputHint.getRackList().size()]));
+      outputHint.setDataLocalHosts(inputHint.getHostList().toArray(new String[inputHint.getHostList().size()]));
+      outputList.add(outputHint);
+    }
+    return new VertexLocationHint(outputList.size(), outputList.toArray(new TaskLocationHint[outputList.size()]));
+  }
+
+  // notes re HDFS URL handling:
+  //   Resource URLs in the protobuf message are strings of the form hdfs://host:port/path 
+  //   org.apache.hadoop.fs.Path.Path  is actually a URI type that allows any scheme
+  //   org.apache.hadoop.yarn.api.records.URL is a URL type used by YARN.
+  //   java.net.URL cannot be used out of the box as it rejects unknown schemes such as HDFS.
+  
+  public static String convertToDAGPlan(URL resource) {
+    // see above notes on HDFS URL handling
+    String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort() 
+        + resource.getFile();
+    return out;
+  }
+
+  public static Map<String, LocalResource> createLocalResourceMapFromDAGPlan(
+      List<PlanLocalResource> localResourcesList) {
+    Map<String, LocalResource> map = new HashMap<String, LocalResource>();
+    for(PlanLocalResource res : localResourcesList){
+      LocalResource r = new LocalResourcePBImpl();
+      
+      //NOTE: have to check every optional field in protobuf generated classes for existence before accessing
+      //else we will receive a default value back, eg ""
+      if(res.hasPattern()){
+        r.setPattern(res.getPattern());
+      }
+      r.setResource(ConverterUtils.getYarnUrlFromPath(new Path(res.getUri())));  // see above notes on HDFS URL handling
+      r.setSize(res.getSize());
+      r.setTimestamp(res.getTimeStamp());
+      r.setType(DagTypeConverters.convertFromDAGPlan(res.getType()));
+      r.setVisibility(DagTypeConverters.convertFromDAGPlan(res.getVisibility()));
+      map.put(res.getName(), r);
+    }
+    return map;
+  }
+
+  public static Map<String, String> createEnvironmentMapFromDAGPlan(
+      List<PlanKeyValuePair> environmentSettingList) {  
+      
+    Map<String, String> map = new HashMap<String, String>();
+    for(PlanKeyValuePair setting : environmentSettingList){
+      map.put(setting.getKey(), setting.getValue());
+    }
+    
+    return map;
+  }
+  
+  public static Map<String, EdgePlan> createEdgePlanMapFromDAGPlan(List<EdgePlan> edgeList){
+    Map<String, EdgePlan> edgePlanMap =
+        new HashMap<String, EdgePlan>();
+    for(EdgePlan edgePlanItem : edgeList){
+      edgePlanMap.put(edgePlanItem.getId(), edgePlanItem);
+    }
+    return edgePlanMap;
+  }
+  
+  public static Map<String, EdgeProperty> createEdgePropertyMapFromDAGPlan(
+      List<EdgePlan> edgeList) {  
+      
+    Map<String, EdgeProperty> map = new HashMap<String, EdgeProperty>();
+    for(EdgePlan edge: edgeList){
+       map.put(edge.getId(), 
+           new EdgeProperty(
+               convertFromDAGPlan(edge.getConnectionPattern()),
+               convertFromDAGPlan(edge.getSourceType()),
+               edge.getInputClass(),
+               edge.getOutputClass()
+               )
+           );
+    }
+    
+    return map;
+  }
+
+  public static Resource CreateResourceRequestFromTaskConfig(
+      PlanTaskConfiguration taskConfig) {
+    return BuilderUtils.newResource(taskConfig.getMemoryMb(), taskConfig.getVirtualCores());
+  }
+
+  public static Map<String, String> createSettingsMapFromDAGPlan(
+      List<PlanKeyValuePair> settingList) {
+    Map<String, String> map = new HashMap<String, String>();
+    for(PlanKeyValuePair setting: settingList){
+      map.put(setting.getKey(), setting.getValue());
+    }
+    return map;
+  }   
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index be03cd8..597fb2f 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -98,5 +98,6 @@ private static final String TEZ_CONF_DIR_ENV = "TEZ_CONF_DIR";
     TEZ_HOME_ENV + "/lib/*"
   };
 
-  public static final String DAG_AM_PLAN_CONFIG_XML = "tez-dag.xml";
+  public static final String DAG_AM_PLAN_PB_BINARY = "tez-dag.pb";
+  public static final String DAG_AM_PLAN_PB_TEXT = "tez-dag.pb.txt";
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
index 8571252..381b4ba 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
@@ -18,14 +18,7 @@
 
 package org.apache.tez.dag.api;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class VertexLocationHint implements Writable {
+public class VertexLocationHint  {
 
   private int numTasks;
   private TaskLocationHint[] taskLocationHints;
@@ -56,7 +49,7 @@ public class VertexLocationHint implements Writable {
     this.taskLocationHints = taskLocationHints;
   }
 
-  public static class TaskLocationHint implements Writable {
+  public static class TaskLocationHint {
 
     // Host names if any to be used
     private String[] hosts;
@@ -84,77 +77,5 @@ public class VertexLocationHint implements Writable {
     public void setRacks(String[] racks) {
       this.racks = racks;
     }
-
-    private void writeStringArray(DataOutput out, String[] array)
-        throws IOException {
-      if (array == null) {
-        out.writeInt(-1);
-        return;
-      }
-      out.writeInt(array.length);
-      for (String entry : array) {
-        out.writeBoolean(entry != null);
-        if (entry != null) {
-          Text.writeString(out, entry);
-        }
-      }
-    }
-
-    private String[] readStringArray(DataInput in)
-        throws IOException {
-      int arrayLen = in.readInt();
-      if (arrayLen == -1) {
-        return null;
-      }
-      String[] array = new String[arrayLen];
-      for (int i = 0; i < arrayLen; ++i) {
-        if (!in.readBoolean()) {
-          array[i] = null;
-        } else {
-          array[i] = Text.readString(in);
-        }
-      }
-      return array;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      writeStringArray(out, hosts);
-      writeStringArray(out, racks);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      hosts = readStringArray(in);
-      racks = readStringArray(in);
-    }
-
-
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(numTasks);
-    for (int i = 0; i < numTasks; ++i) {
-      out.writeBoolean(taskLocationHints[i] != null);
-      if (taskLocationHints[i] != null) {
-        taskLocationHints[i].write(out);
-      }
-    }
   }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    numTasks = in.readInt();
-    taskLocationHints = new TaskLocationHint[numTasks];
-    for (int i = 0; i < numTasks; ++i) {
-      if (!in.readBoolean()) {
-        taskLocationHints[i] = null;
-      } else {
-        taskLocationHints[i] = new TaskLocationHint(null, null);
-        taskLocationHints[i].readFields(in);
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/main/proto/DAGPlan.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGPlan.proto b/tez-dag-api/src/main/proto/DAGPlan.proto
new file mode 100644
index 0000000..38e8971
--- /dev/null
+++ b/tez-dag-api/src/main/proto/DAGPlan.proto
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.dag.api";
+option java_outer_classname = "DAGPlan";
+option java_generate_equals_and_hash = true;
+package org.apache.tez.dag.api;
+
+// Many of these types have a dual in the Tez-api.  To reduce confusion, these types have prefix or suffix 
+// of "Plan" to indicate they are to be used in the dag-plan.
+// The big types use a suffix:  JobPlan, VertexPlan, EdgePlan 
+//   --> these get more direct use in the runtime and the naming is natural.
+// The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc
+//   --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable. 
+
+enum PlanVertexType {
+  INPUT = 0;
+  NORMAL = 1;
+  OUTPUT = 2;
+}
+
+enum PlanEdgeConnectionPattern {
+  ONE_TO_ONE = 0;
+  ONE_TO_ALL = 1;
+  BIPARTITE = 2;
+}
+
+enum PlanEdgeSourceType {
+  STABLE = 0;
+  STABLE_PERSISTED = 1;
+  STREAMING = 2;
+}
+
+message PlanKeyValuePair {
+  required string key = 1;
+  required string value = 2;
+}
+
+enum PlanLocalResourceType {
+  FILE = 0;
+  ARCHIVE = 1;
+  PATTERN = 2;
+}
+
+enum PlanLocalResourceVisibility {
+  PUBLIC = 0;
+  PRIVATE = 1;
+  APPLICATION = 2;
+}
+
+message PlanLocalResource {
+  required string name = 1;
+  required string uri = 2;
+  required int64 size = 3;
+  required int64 timeStamp = 4;
+  required PlanLocalResourceType type = 5;
+  required PlanLocalResourceVisibility visibility = 6;
+  optional string pattern = 7; // only used if type=PATTERN
+}
+
+// Each taskLocationHint represents a single split in in the input.
+// It is the list of [{rack,machines}] that host a replica of each particular split.
+// For now it is represented as pair-of-arrays rather than array-of-pairs.
+message PlanTaskLocationHint {
+  repeated string rack = 1;
+  repeated string host = 2;
+}
+
+message PlanTaskConfiguration {
+  required int32 numTasks = 1;
+  required int32 memoryMb = 2;
+  required int32 virtualCores = 3;
+  required string javaOpts = 4;
+  required string taskModule = 5;
+  repeated PlanLocalResource localResource = 6;
+  repeated PlanKeyValuePair environmentSetting = 8;  
+}
+
+message VertexPlan {
+  required string name = 1;
+  required PlanVertexType type = 2;
+  optional string processorName = 3;
+  required PlanTaskConfiguration taskConfig = 4;
+  repeated PlanTaskLocationHint taskLocationHint = 7;
+  repeated string inEdgeId = 8;
+  repeated string outEdgeId = 9;
+}
+
+message EdgePlan {
+  required string id = 1;
+  required string inputVertexName = 2;
+  required string outputVertexName = 3;
+  required PlanEdgeConnectionPattern connectionPattern = 4;
+  required PlanEdgeSourceType sourceType = 5;
+  required string inputClass = 6;
+  required string outputClass = 7;
+}
+
+message JobPlan {
+  required string name = 1;
+  repeated VertexPlan vertex = 3;
+  repeated EdgePlan edge = 4;
+  repeated PlanKeyValuePair jobSetting = 5;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
new file mode 100644
index 0000000..f89a73e
--- /dev/null
+++ b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.DAGPlan.*;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+// based on TestDAGLocationHint
+public class TestDAGPlan {
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder(); //TODO: doesn't seem to be deleting this folder automatically as expected.
+
+  @Test
+  public void testBasicJobPlanSerde() throws IOException {
+
+    JobPlan job = JobPlan.newBuilder()
+       .setName("test")
+       .addVertex(
+           VertexPlan.newBuilder()
+             .setName("vertex1")
+             .setType(PlanVertexType.NORMAL)
+             .addTaskLocationHint(PlanTaskLocationHint.newBuilder().addHost("machineName").addRack("rack1").build())
+             .setTaskConfig(
+                 DAGPlan.PlanTaskConfiguration.newBuilder()
+                   .setNumTasks(2)
+                   .setVirtualCores(4)
+                   .setMemoryMb(1024)
+                   .setJavaOpts("")
+                   .setTaskModule("x.y")
+                   .build())
+             .build())
+        .build();
+   File file = tempFolder.newFile("jobPlan");
+   FileOutputStream outStream = null;
+   try {
+     outStream = new FileOutputStream(file);
+     job.writeTo(outStream); 
+   }
+   finally {
+     if(outStream != null){
+       outStream.close();  
+     }
+   }
+
+   JobPlan inJob;
+   FileInputStream inputStream;
+   try {
+     inputStream = new FileInputStream(file);
+     inJob = JobPlan.newBuilder().mergeFrom(inputStream).build();
+   }
+   finally {
+     outStream.close();  
+   }
+
+   Assert.assertEquals(job, inJob);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 37587e4..57003c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.EnumSet;
@@ -62,8 +63,9 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.DAGConfiguration;
-import org.apache.tez.dag.api.DAGLocationHint;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
+import org.apache.tez.dag.api.DAGPlan.VertexPlan;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.client.ClientService;
 import org.apache.tez.dag.app.client.impl.TezClientService;
@@ -131,7 +133,7 @@ public class DAGAppMaster extends CompositeService {
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   private Clock clock;
-  private final DAGConfiguration dagPlan;
+  private final JobPlan jobPlan;
   private long dagsStartTime;
   private final long startTime;
   private final long appSubmitTime;
@@ -170,8 +172,6 @@ public class DAGAppMaster extends CompositeService {
   private TaskSchedulerEventHandler taskSchedulerEventHandler;
   private HistoryEventHandler historyEventHandler;
 
-  private DAGLocationHint dagLocationHint;
-
   private DAGAppMasterState state;
 
   private DAG dag;
@@ -180,16 +180,16 @@ public class DAGAppMaster extends CompositeService {
 
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      long appSubmitTime, DAGConfiguration dagPlan) {
+      long appSubmitTime, JobPlan dagPB) {
     this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
-        new SystemClock(), appSubmitTime, dagPlan);
+        new SystemClock(), appSubmitTime, dagPB);
   }
 
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      Clock clock, long appSubmitTime, DAGConfiguration dagPlan) {
+      Clock clock, long appSubmitTime, JobPlan dagPB) {
     super(DAGAppMaster.class.getName());
-    this.dagPlan = dagPlan;
+    this.jobPlan = dagPB;
     this.clock = clock;
     this.startTime = clock.getTime();
     this.appSubmitTime = appSubmitTime;
@@ -215,13 +215,12 @@ public class DAGAppMaster extends CompositeService {
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     downloadTokensAndSetupUGI(conf);
-    setupDAGLocationHint(dagPlan);
 
     context = new RunningAppContext(conf);
 
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
-    appName = dagPlan.getName();
+    appName = jobPlan.getName();
 
     dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
 
@@ -521,11 +520,11 @@ public class DAGAppMaster extends CompositeService {
 //  }
 
   /** Create and initialize (but don't start) a single dag. */
-  protected DAG createDAG(DAGConfiguration dagPlan) {
+  protected DAG createDAG(JobPlan dagPB) {
 
     // create single job
     DAG newDag =
-        new DAGImpl(dagId, appAttemptID, conf, dagPlan, dispatcher.getEventHandler(),
+        new DAGImpl(dagId, appAttemptID, conf, dagPB, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
             // TODO Recovery
             //completedTasksFromPreviousRun,
@@ -534,7 +533,7 @@ public class DAGAppMaster extends CompositeService {
             //committer, newApiCommitter,
             currentUser.getShortUserName(), appSubmitTime,
             //amInfos,
-            taskHeartbeatHandler, context, dagLocationHint);
+            taskHeartbeatHandler, context);
     ((RunningAppContext) context).setDAG(newDag);
 
     dispatcher.register(DAGFinishEvent.Type.class,
@@ -578,23 +577,6 @@ public class DAGAppMaster extends CompositeService {
     }
   }
 
-  protected void setupDAGLocationHint(DAGConfiguration conf) {
-    try {
-      String dagLocationHintFile =
-          conf.get(DAGConfiguration.DAG_LOCATION_HINT_RESOURCE_FILE,
-              DAGConfiguration.DEFAULT_DAG_LOCATION_HINT_RESOURCE_FILE);
-      File f = new File(dagLocationHintFile);
-      if (f.exists()) {
-        this.dagLocationHint = DAGLocationHint.initDAGDagLocationHint(
-            dagLocationHintFile);
-      } else {
-        this.dagLocationHint = new DAGLocationHint();
-      }
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-  }
-
   protected void addIfService(Object object) {
     if (object instanceof Service) {
       addService((Service) object);
@@ -846,7 +828,7 @@ public class DAGAppMaster extends CompositeService {
     */
     
     // /////////////////// Create the job itself.
-    dag = createDAG(dagPlan);
+    dag = createDAG(jobPlan);
 
     // End of creating the job.
 
@@ -973,27 +955,41 @@ public class DAGAppMaster extends CompositeService {
       // Default to running mr if nothing specified.
       // TODO change this once the client is ready.
       String type;
-      DAGConfiguration dagPlan = null;
       TezConfiguration conf = new TezConfiguration(new YarnConfiguration());
+      
+      JobPlan jobPlan = null;
       if (cliParser.hasOption(OPT_PREDEFINED)) {
         LOG.info("Running with PreDefined configuration");
         type = cliParser.getOptionValue(OPT_PREDEFINED, "mr");
         LOG.info("Running job type: " + type);
 
         if (type.equals("mr")) {
-          dagPlan = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMR();
+          jobPlan = MRRExampleHelper.createDAGConfigurationForMR();
         } else if (type.equals("mrr")) {
-          dagPlan = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMRR();
+          jobPlan = MRRExampleHelper.createDAGConfigurationForMRR();
+        }
+      } 
+      else {
+        // Read the protobuf DAG
+        JobPlan.Builder dagPlanBuilder = JobPlan.newBuilder(); 
+        FileInputStream dagPBBinaryStream = null;
+        try {
+          dagPBBinaryStream = new FileInputStream(TezConfiguration.DAG_AM_PLAN_PB_BINARY);
+          dagPlanBuilder.mergeFrom(dagPBBinaryStream);
         }
-      } else {
-        dagPlan = new DAGConfiguration();
-        dagPlan.addResource(TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
+        finally {
+          if(dagPBBinaryStream != null){
+            dagPBBinaryStream.close();  
+          }
+        }
+
+        jobPlan = dagPlanBuilder.build();
       }
 
       LOG.info("XXXX Running a DAG with "
-          + dagPlan.getVertices().length + " vertices ");
-      for (String v : dagPlan.getVertices()) {
-        LOG.info("XXXX DAG has vertex " + v);
+          + jobPlan.getVertexCount() + " vertices ");
+      for (VertexPlan v : jobPlan.getVertexList()) {
+        LOG.info("XXXX DAG has vertex " + v.getName());
       }
 
       String jobUserName = System
@@ -1004,7 +1000,7 @@ public class DAGAppMaster extends CompositeService {
       // the objects myself.
       conf.setBoolean("fs.automatic.close", false);
       
-      Map<String, String> config = dagPlan.getConfig();
+      Map<String, String> config = DagTypeConverters.createSettingsMapFromDAGPlan(jobPlan.getJobSettingList());
       for(Entry<String, String> entry : config.entrySet()) {
         conf.set(entry.getKey(), entry.getValue());
       }
@@ -1012,7 +1008,7 @@ public class DAGAppMaster extends CompositeService {
       DAGAppMaster appMaster =
           new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
               Integer.parseInt(nodePortString),
-              Integer.parseInt(nodeHttpPortString), appSubmitTime, dagPlan);
+              Integer.parseInt(nodeHttpPortString), appSubmitTime, jobPlan);
       ShutdownHookManager.get().addShutdownHook(
         new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
index 1b69919..b50887a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
@@ -17,7 +17,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
 import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
@@ -84,7 +84,10 @@ public class MRRExampleHelper {
    return resourceNames;
  }
 
- static Configuration createDAGConfigurationForMRR() throws IOException {
+ // TODO: these preconfigured jobs seem to require User and perhaps some other work.
+ //       -> not tested with new DagPB system.
+ 
+ static JobPlan createDAGConfigurationForMRR() throws IOException {
    org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
    Vertex mapVertex = new Vertex("map",
        "org.apache.tez.mapreduce.task.InitialTask", 6);
@@ -143,16 +146,15 @@ public class MRRExampleHelper {
    dag.addEdge(edge1);
    dag.addEdge(edge2);
    dag.verify();
-   DAGConfiguration dagConf = dag.serializeDag();
-
-   dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
-   dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
-
-   return dagConf;
+   dag.addConfiguration(MRJobConfig.MAP_SPECULATIVE, new Boolean(false).toString());
+   dag.addConfiguration(MRJobConfig.REDUCE_SPECULATIVE, new Boolean(false).toString());
+   
+   JobPlan dagPB = dag.createDag();
+   return dagPB;
  }
 
  // TODO remove once client is in place
- static Configuration createDAGConfigurationForMR() throws IOException {
+ static JobPlan createDAGConfigurationForMR() throws IOException {
    org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
    Vertex mapVertex = new Vertex("map",
        "org.apache.tez.mapreduce.task.InitialTask", 6);
@@ -194,12 +196,13 @@ public class MRRExampleHelper {
    dag.addVertex(reduceVertex);
    dag.addEdge(edge);
    dag.verify();
-   DAGConfiguration dagConf = dag.serializeDag();
-
-   dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
-   dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
-
-   return dagConf;
+   
+   dag.addConfiguration(MRJobConfig.MAP_SPECULATIVE, new Boolean(false).toString());
+   dag.addConfiguration(MRJobConfig.REDUCE_SPECULATIVE, new Boolean(false).toString());
+   
+   JobPlan dagPB = dag.createDag();
+   
+   return dagPB;
  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 03bf570..39cfc1a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.engine.records.TezDAGID;
@@ -60,8 +60,8 @@ public interface DAG {
   
   TezConfiguration getConf();
   
-  DAGConfiguration getDagPlan();
-
+  JobPlan getJobPlan();
+  
   /**
    * @return the ACLs for this job for each type of JobACL given. 
    */
@@ -75,5 +75,4 @@ public interface DAG {
   
   boolean checkAccess(UserGroupInformation callerUGI, ApplicationAccessType jobOperation);
 
-  VertexLocationHint getVertexLocationHint(TezVertexID vertexId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 5d8b8f7..8d9fbc3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -28,6 +28,7 @@ import org.apache.tez.common.InputSpec;
 import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.DAGPlan.VertexPlan;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 import org.apache.tez.engine.records.TezTaskID;
 import org.apache.tez.engine.records.TezVertexID;
@@ -39,6 +40,8 @@ import org.apache.tez.engine.records.TezVertexID;
 public interface Vertex extends Comparable<Vertex> {
 
   TezVertexID getVertexId();
+  public VertexPlan getVertexPlan();
+  
   int getDistanceFromRoot();
   String getName();
   VertexState getState();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 0e1206f..2e20692 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -53,13 +52,15 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAGConfiguration;
-import org.apache.tez.dag.api.DAGLocationHint;
+import org.apache.tez.dag.api.DAGPlan.EdgePlan;
+import org.apache.tez.dag.api.DAGPlan.JobPlan;
+import org.apache.tez.dag.api.DAGPlan.VertexPlan;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
@@ -124,16 +125,14 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private final long appSubmitTime;
   private final AppContext appContext;
 
-  volatile Map<TezVertexID, Vertex> vertices =
-      new HashMap<TezVertexID, Vertex>();
-  private Map<String, EdgeProperty> edgeProperties = 
-                                          new HashMap<String, EdgeProperty>();
+  volatile Map<TezVertexID, Vertex> vertices = new HashMap<TezVertexID, Vertex>();
+  private Map<String, EdgeProperty> edges = new HashMap<String, EdgeProperty>(); 
   private TezCounters dagCounters = new TezCounters();
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
 
   public final TezConfiguration conf;
-  public final DAGConfiguration dagPlan;
+  private final JobPlan jobPlan;
 
   //fields initialized in init
   private FileSystem fs;
@@ -151,9 +150,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private static final DAGSchedulerUpdateTransition 
           DAG_SCHEDULER_UPDATE_TRANSITION = new DAGSchedulerUpdateTransition();
 
-  // Location hints for all vertices in DAG
-  private final DAGLocationHint dagLocationHint;
-
   protected static final
     StateMachineFactory<DAGImpl, DAGState, DAGEventType, DAGEvent>
        stateMachineFactory
@@ -315,7 +311,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   public DAGImpl(TezDAGID dagId, ApplicationAttemptId applicationAttemptId,
       TezConfiguration conf,
-      DAGConfiguration dagPlan,
+      JobPlan jobPlan,
       EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
@@ -327,14 +323,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       // TODO Recovery
       //List<AMInfo> amInfos,
       TaskHeartbeatHandler thh,
-      AppContext appContext,
-      DAGLocationHint dagLocationHint) {
+      AppContext appContext) {
     this.applicationAttemptId = applicationAttemptId;
     this.dagId = dagId;
-    this.dagPlan = dagPlan;
+    this.jobPlan = jobPlan;
     this.conf = conf;
-    this.dagName = (dagPlan.getName() != null) ? dagPlan.getName() : 
-                                                  "<missing app name>";
+    this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
+    
     this.userName = appUserName;
     // TODO Metrics
     //this.metrics = metrics;
@@ -357,8 +352,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     this.aclsManager = new ApplicationACLsManager(conf);
 
-    this.dagLocationHint = dagLocationHint;
-
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
     stateMachine = stateMachineFactory.make(this);
@@ -380,8 +373,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   }
   
   @Override
-  public DAGConfiguration getDagPlan() {
-    return dagPlan;
+  public JobPlan getJobPlan() {
+    return jobPlan;
   }
 
   EventHandler getEventHandler() {
@@ -812,18 +805,19 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         */
 
         // create the vertices
-        String[] vertexNames = dag.getDagPlan().getVertices();
-        dag.numVertices = vertexNames.length;
+        dag.numVertices = dag.getJobPlan().getVertexCount();
         for (int i=0; i < dag.numVertices; ++i) {
-          VertexImpl v = createVertex(dag, vertexNames[i], i);
+          String vertexName = dag.getJobPlan().getVertex(i).getName();
+          VertexImpl v = createVertex(dag, vertexName, i);
           dag.addVertex(v);
         }
 
-        dag.edgeProperties = dag.getDagPlan().getEdgeProperties();
-
+        dag.edges = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+        Map<String,EdgePlan> edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan(dag.getJobPlan().getEdgeList());
+        
         // setup the dag
         for (Vertex v : dag.vertices.values()) {
-          parseVertexEdges(dag, v);
+          parseVertexEdges(dag, edgePlans, v);
         }
 
         dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
@@ -846,39 +840,43 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
     private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
       TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);
+      
+      VertexPlan vertexPlan = dag.getJobPlan().getVertex(vId);
+      VertexLocationHint vertexLocationHint = DagTypeConverters.convertFromDAGPlan(vertexPlan.getTaskLocationHintList());
+        
       return new VertexImpl(
-          vertexId, vertexName, dag.conf,
+          vertexId, vertexPlan, vertexName, dag.conf,
           dag.eventHandler, dag.taskAttemptListener,
           dag.jobToken, dag.fsTokens, dag.clock,
           dag.taskHeartbeatHandler, dag.appContext,
-          dag.dagLocationHint.getVertexLocationHint(vertexName));
+          vertexLocationHint);
     }
 
-    private void parseVertexEdges(DAGImpl dag, Vertex vertex) {
-      String[] inVerticesNames =
-          dag.getDagPlan().getInputVertices(vertex.getName());
-      List<String> inEdges =
-          dag.getDagPlan().getInputEdgeIds(vertex.getName());
+    // hooks up this VertexImpl to input and output EdgeProperties
+    private void parseVertexEdges(DAGImpl dag, Map<String, EdgePlan> edgePlans, Vertex vertex) {
+      VertexPlan vertexPlan = vertex.getVertexPlan();
+
       Map<Vertex, EdgeProperty> inVertices =
           new HashMap<Vertex, EdgeProperty>();
-      for (int i=0; i < inVerticesNames.length; ++i) {
-        String vertexName = inVerticesNames[i];
-        inVertices.put(dag.getVertex(vertexName), 
-                       dag.edgeProperties.get(inEdges.get(i)));
-      }
-      vertex.setInputVertices(inVertices);
 
-      String[] outVerticesNames =
-          dag.getDagPlan().getOutputVertices(vertex.getName());
-      List<String> outEdges =
-          dag.getDagPlan().getOutputEdgeIds(vertex.getName());
       Map<Vertex, EdgeProperty> outVertices =
           new HashMap<Vertex, EdgeProperty>();
-      for (int i=0; i < outVerticesNames.length; ++i) {
-        String vertexName = outVerticesNames[i];
-        outVertices.put(dag.getVertex(vertexName), 
-                        dag.edgeProperties.get(outEdges.get(i)));
+
+      for(String inEdgeId : vertexPlan.getInEdgeIdList()){
+        EdgePlan edgePlan = edgePlans.get(inEdgeId);
+        Vertex inVertex = dag.vertexMap.get(edgePlan.getInputVertexName());    
+        EdgeProperty edgeProp = dag.edges.get(inEdgeId);
+        inVertices.put(inVertex, edgeProp);
       }
+      
+      for(String outEdgeId : vertexPlan.getOutEdgeIdList()){
+        EdgePlan edgePlan = edgePlans.get(outEdgeId);
+        Vertex outVertex = dag.vertexMap.get(edgePlan.getOutputVertexName());    
+        EdgeProperty edgeProp = dag.edges.get(outEdgeId);
+        outVertices.put(outVertex, edgeProp);
+      }
+      
+      vertex.setInputVertices(inVertices);
       vertex.setOutputVertices(outVertices);
     }
 
@@ -1145,11 +1143,4 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       job.finished(DAGState.ERROR);
     }
   }
-
-  @Override
-  public VertexLocationHint getVertexLocationHint(TezVertexID vertexId) {
-    return dagLocationHint.getVertexLocationHint(
-        getVertex(vertexId).getName());
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 80de958..a423db1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -55,7 +55,8 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.tez.common.InputSpec;
 import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.DAGPlan.VertexPlan;
+import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -324,7 +325,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Credentials fsTokens;
   private Token<JobTokenIdentifier> jobToken;
 
-  private final TezVertexID vertexId;
+  private final TezVertexID vertexId;  //runtime assigned id.
+  private final VertexPlan vertexPlan;  
 
   private final String vertexName;
   private final String processorName;
@@ -341,7 +343,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   private Map<String, String> environment;
   private String javaOpts;
   
-  public VertexImpl(TezVertexID vertexId, String vertexName,
+  public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan, String vertexName,
       TezConfiguration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
@@ -353,6 +355,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       TaskHeartbeatHandler thh,
       AppContext appContext, VertexLocationHint vertexLocationHint) {
     this.vertexId = vertexId;
+    this.vertexPlan = vertexPlan;
     this.vertexName = vertexName;
     this.conf = conf;
     //this.metrics = metrics;
@@ -373,12 +376,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     this.committer = new NullVertexOutputCommitter();
     this.vertexLocationHint = vertexLocationHint;
     
-    this.taskResource = getDAGPlan().getVertexResource(getName());
-    this.processorName = getDAGPlan().getVertexTaskModuleClassName(getName());
-    this.localResources = getDAGPlan().getVertexLocalResources(getName());
-    this.environment = getDAGPlan().getVertexEnv(getName());
-    
-    this.javaOpts = getDAGPlan().getVertexJavaOpts(getName());
+    this.taskResource = DagTypeConverters.CreateResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
+    this.processorName = vertexPlan.hasProcessorName() ? vertexPlan.getProcessorName() : null;  
+    this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig().getLocalResourceList());
+    this.environment = DagTypeConverters.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig().getEnvironmentSettingList());
+    this.javaOpts = vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
     
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
@@ -395,6 +397,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   }
   
   @Override
+  public VertexPlan getVertexPlan() {
+    return vertexPlan;
+  }
+  
+  @Override
   public int getDistanceFromRoot() {
     return distanceFromRoot;
   }
@@ -727,7 +734,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
 
         // TODODAGAM
         // TODO: Splits?
-        vertex.numTasks = vertex.getDAGPlan().getNumVertexTasks(vertex.getName());
+        
+        vertex.numTasks = vertex.getVertexPlan().getTaskConfig().getNumTasks();
+        
         /*
         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
         job.numMapTasks = taskSplitMetaInfo.length;
@@ -1217,10 +1226,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
     return appContext.getDAG();
   }
   
-  DAGConfiguration getDAGPlan() {
-    return getDAG().getDagPlan();
-  }
-
   // TODO Eventually remove synchronization.
   @Override
   public synchronized List<InputSpec> getInputSpecList() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
index 331e2a3..b3db770 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
@@ -82,6 +82,12 @@ public class TezEngineChildJVM {
     //set custom javaOpts
     vargs.add(javaOpts); 
     
+//[Debug Task] Current simplest way to attach debugger to  Tez Child Task
+// Uncomment the following, then launch a regular job
+// Works best on one-box configured with a single container (hence one task at a time). 
+//    LOG.error(" !!!!!!!!! Launching Child-Task in debug/suspend mode.  Attach to port 8003 !!!!!!!!");
+//    vargs.add("-Xdebug -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=8003,server=y,suspend=y");
+    
     Path childTmpDir = new Path(Environment.PWD.$(),
         YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
     vargs.add("-Djava.io.tmpdir=" + childTmpDir);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/cb5758b4/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java b/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java
deleted file mode 100644
index 3deaee7..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestDAGLocationHint {
-
-  private DataInput in;
-  private DataOutput out;
-  private ByteArrayOutputStream bOut;
-
-  @Before
-  public void setup() {
-    bOut = new ByteArrayOutputStream();
-    out = new DataOutputStream(bOut);
-  }
-
-  @After
-  public void teardown() {
-    in = null;
-    out = null;
-    bOut = null;
-  }
-
-  @Test
-  public void testNullDAGLocationHintSerDes() throws IOException {
-    DAGLocationHint expected = new DAGLocationHint();
-    expected.write(out);
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    DAGLocationHint actual = new DAGLocationHint();
-    actual.readFields(in);
-    Assert.assertNotNull(actual.getVertexLocationHints());
-    Assert.assertEquals(0, actual.getVertexLocationHints().size());
-  }
-
-  @Test
-  public void testDAGLocationHintSerDes() throws IOException {
-    String[] hosts = { "h1", "h2", "", null };
-    String[] racks = { "r1", "r2" };
-
-    VertexLocationHint vertexLocationHint = new VertexLocationHint(4);
-    vertexLocationHint.getTaskLocationHints()[0] =
-        new TaskLocationHint(hosts, racks);
-    DAGLocationHint expected = new DAGLocationHint();
-    expected.getVertexLocationHints().put("v1", null);
-    expected.getVertexLocationHints().put("v2", new VertexLocationHint());
-    expected.getVertexLocationHints().put("v3", vertexLocationHint);
-    expected.write(out);
-
-    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
-    DAGLocationHint actual = new DAGLocationHint();
-    actual.readFields(in);
-    Assert.assertNotNull(actual.getVertexLocationHints());
-    Assert.assertEquals(3, actual.getVertexLocationHints().size());
-
-    Assert.assertNull(actual.getVertexLocationHint("v1"));
-    Assert.assertNotNull(actual.getVertexLocationHint("v2"));
-    Assert.assertNotNull(actual.getVertexLocationHint("v3"));
-
-    Assert.assertEquals(0, actual.getVertexLocationHint("v2").getNumTasks());
-    Assert.assertEquals(0,
-        actual.getVertexLocationHint("v2").getTaskLocationHints().length);
-
-    Assert.assertEquals(4, actual.getVertexLocationHint("v3").getNumTasks());
-    Assert.assertEquals(4,
-        actual.getVertexLocationHint("v3").getTaskLocationHints().length);
-    Assert.assertNotNull(
-        actual.getVertexLocationHint("v3").getTaskLocationHints()[0]);
-    Assert.assertArrayEquals(racks,
-        actual.getVertexLocationHint("v3").getTaskLocationHints()[0].
-            getRacks());
-    Assert.assertArrayEquals(hosts,
-        actual.getVertexLocationHint("v3").getTaskLocationHints()[0].
-            getDataLocalHosts());
-    Assert.assertNull(
-        actual.getVertexLocationHint("v3").getTaskLocationHints()[1]);
-    Assert.assertNull(
-        actual.getVertexLocationHint("v3").getTaskLocationHints()[2]);
-    Assert.assertNull(
-        actual.getVertexLocationHint("v3").getTaskLocationHints()[3]);
-  }
-
-
-}
-