You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/09/29 02:35:43 UTC

[37/50] [abbrv] git commit: TEZ-1433. Invalid credentials can be used when a DAG is submitted to a session which has timed out (bikas)

TEZ-1433. Invalid credentials can be used when a DAG is submitted to a session which has timed out (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/06fa79ae
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/06fa79ae
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/06fa79ae

Branch: refs/heads/branch-0.5
Commit: 06fa79ae599187d7976ed5e9cccf9c66fc91eff7
Parents: 9159e11
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Sep 22 22:25:48 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Sep 22 22:25:48 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/tez/client/TezClientUtils.java   |  49 ++-----
 .../main/java/org/apache/tez/dag/api/DAG.java   | 135 +++++++++---------
 .../apache/tez/dag/api/DagTypeConverters.java   |  29 ++++
 tez-api/src/main/proto/DAGApiRecords.proto      |   1 +
 .../org/apache/tez/client/TestTezClient.java    |   6 +-
 .../org/apache/tez/dag/api/TestDAGPlan.java     |   8 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |  79 ++++++-----
 .../java/org/apache/tez/dag/app/dag/DAG.java    |   2 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  11 ++
 .../app/rm/container/AMContainerHelpers.java    |   9 +-
 .../dag/app/rm/container/AMContainerImpl.java   |   8 +-
 .../apache/tez/dag/app/MockDAGAppMaster.java    |  18 ++-
 .../tez/dag/app/TestMockDAGAppMaster.java       | 138 +++++++++++++++++++
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |   2 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   2 +-
 .../tez/dag/history/utils/TestDAGUtils.java     |   2 +-
 17 files changed, 348 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f3b2ed0..998bb03 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -57,6 +57,8 @@ ALL CHANGES
   Windows
   TEZ-1554. Failing tests in TestMRHelpers related to environment on Windows
   TEZ-978. Enhance auto parallelism tuning for queries having empty outputs or data skewness
+  TEZ-1433. Invalid credentials can be used when a DAG is submitted to a
+  session which has timed out
 
 Release 0.5.0: 2014-09-03
 

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 917fcff..1e01138 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -334,17 +334,13 @@ public class TezClientUtils {
    * @throws IOException
    */
   @Private
-  static void setupDAGCredentials(DAG dag, Credentials sessionCredentials,
+  static Credentials setupDAGCredentials(DAG dag, Credentials sessionCredentials,
       Configuration conf) throws IOException {
 
     Preconditions.checkNotNull(sessionCredentials);
     TezCommonUtils.logCredentials(LOG, sessionCredentials, "session");
 
-    Credentials dagCredentials = dag.getCredentials();
-    if (dagCredentials == null) {
-      dagCredentials = new Credentials();
-      dag.setCredentials(dagCredentials);
-    }
+    Credentials dagCredentials = new Credentials();
     // All session creds are required for the DAG.
     dagCredentials.mergeAll(sessionCredentials);
     
@@ -361,6 +357,10 @@ public class TezClientUtils {
           lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
         }
       }
+      
+      for (LocalResource lr: dag.getTaskLocalFiles().values()) {
+        lrPaths.add(ConverterUtils.getPathFromYarnURL(lr.getResource()));
+      }
 
       Path[] paths = lrPaths.toArray(new Path[lrPaths.size()]);
       TokenCache.obtainTokensForFileSystems(dagCredentials, paths, conf);
@@ -368,6 +368,8 @@ public class TezClientUtils {
     } catch (URISyntaxException e) {
       throw new IOException(e);
     }
+    
+    return dagCredentials;
   }
 
   @Private
@@ -622,35 +624,12 @@ public class TezClientUtils {
 
   }
   
-  static void updateDAGVertices(DAG dag, AMConfiguration amConfig,
-      Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
-      Credentials credentials) throws IOException {
-    setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration());
-    for (Vertex v : dag.getVertices()) {
-      if (tezJarResources != null) {
-        v.getTaskLocalFiles().putAll(tezJarResources);
-      }
-      v.getTaskLocalFiles().put(TezConstants.TEZ_PB_BINARY_CONF_NAME,
-          amConfig.getBinaryConfLR());
-
-      Map<String, String> taskEnv = v.getTaskEnvironment();
-      TezYARNUtils.setupDefaultEnv(taskEnv, amConfig.getTezConfiguration(),
-          TezConfiguration.TEZ_TASK_LAUNCH_ENV,
-          TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
-
-      setDefaultLaunchCmdOpts(v, amConfig.getTezConfiguration());
-    }
-  }
-  
   static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
       Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
       Credentials credentials) throws IOException {
-    DAGPlan dagPB = dag.getCachedDAGPlan();
-    if (dagPB == null) {
-      updateDAGVertices(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials);  
-      dagPB = dag.createDag(amConfig.getTezConfiguration());  
-    }    
-    return dagPB;
+    Credentials dagCredentials = setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration());
+    return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
+        amConfig.getBinaryConfLR(), true);
   }
   
   static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
@@ -678,8 +657,8 @@ public class TezClientUtils {
     return StringUtils.join(vargs, " ").trim();
   }
   
-  static void setDefaultLaunchCmdOpts(Vertex v, TezConfiguration conf) {
-    String vOpts = v.getTaskLaunchCmdOpts();
+  @Private
+  public static String addDefaultsToTaskLaunchCmdOpts(String vOpts, Configuration conf) {
     String vConfigOpts = conf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS,
         TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS_DEFAULT);
     if (vConfigOpts != null && vConfigOpts.length() > 0) {
@@ -689,7 +668,7 @@ public class TezClientUtils {
     vOpts = maybeAddDefaultLoggingJavaOpts(conf.get(
         TezConfiguration.TEZ_TASK_LOG_LEVEL,
         TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT), vOpts);
-    v.setTaskLaunchCmdOpts(vOpts);
+    return vOpts;
   }
 
   @Private

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index ffd2e83..c28f210 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -39,10 +39,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
@@ -53,7 +54,6 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
-import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
@@ -89,7 +89,6 @@ public class DAG {
   Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
   
   private Stack<String> topologicalVertexStack = new Stack<String>();
-  private DAGPlan cachedDAGPlan;
 
   private DAG(String name) {
     this.name = name;
@@ -112,7 +111,7 @@ public class DAG {
    *          elements of the map.
    * @return {@link DAG}
    */
-  public DAG addTaskLocalFiles(Map<String, LocalResource> localFiles) {
+  public synchronized DAG addTaskLocalFiles(Map<String, LocalResource> localFiles) {
     Preconditions.checkNotNull(localFiles);
     TezCommonUtils.addAdditionalLocalResources(localFiles, commonTaskLocalFiles);
     return this;
@@ -298,6 +297,11 @@ public class DAG {
     return this.name;
   }
   
+  @Private
+  public Map<String, LocalResource> getTaskLocalFiles() {
+    return commonTaskLocalFiles;
+  }
+  
   void checkAndInferOneToOneParallelism() {
     // infer all 1-1 via dependencies
     // collect all 1-1 edges where the source parallelism is set
@@ -575,14 +579,11 @@ public class DAG {
     }
   }
 
-  @Private
-  public DAGPlan getCachedDAGPlan() {
-    return cachedDAGPlan;
-  }
-
   // create protobuf message describing DAG
   @Private
-  public DAGPlan createDag(Configuration dagConf) {
+  public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
+      Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
+      boolean tezLrsAsArchive) {
     verify(true);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -608,6 +609,15 @@ public class DAG {
       }
     }
 
+    Credentials dagCredentials = new Credentials();
+    if (extraCredentials != null) {
+      dagCredentials.mergeAll(extraCredentials);
+    }
+    dagCredentials.mergeAll(credentials);
+    if (!commonTaskLocalFiles.isEmpty()) {
+      dagBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(commonTaskLocalFiles));
+    }
+
     Preconditions.checkArgument(topologicalVertexStack.size() == vertices.size(),
         "size of topologicalVertexStack is:" + topologicalVertexStack.size() +
         " while size of vertices is:" + vertices.size() +
@@ -615,38 +625,57 @@ public class DAG {
     while(!topologicalVertexStack.isEmpty()) {
       Vertex vertex = vertices.get(topologicalVertexStack.pop());
       // infer credentials, resources and parallelism from data source
-      if (vertex.getTaskResource() == null) {
-        vertex.setTaskResource(Resource.newInstance(dagConf.getInt(
+      Resource vertexTaskResource = vertex.getTaskResource();
+      if (vertexTaskResource == null) {
+        vertexTaskResource = Resource.newInstance(dagConf.getInt(
             TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
             TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), dagConf.getInt(
             TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
-            TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT)));
+            TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
       }
+      Map<String, LocalResource> vertexLRs = Maps.newHashMap();
+      vertexLRs.putAll(vertex.getTaskLocalFiles());
       List<DataSourceDescriptor> dataSources = vertex.getDataSources();
       for (DataSourceDescriptor dataSource : dataSources) {
         if (dataSource.getCredentials() != null) {
-          credentials.addAll(dataSource.getCredentials());
+          dagCredentials.addAll(dataSource.getCredentials());
         }
         vertex.addTaskLocalFiles(dataSource.getAdditionalLocalFiles());
+        if (dataSource.getAdditionalLocalFiles() != null) {
+          TezCommonUtils.addAdditionalLocalResources(dataSource.getAdditionalLocalFiles(), vertexLRs);
+        }
+      }
+      if (tezJarResources != null) {
+        TezCommonUtils.addAdditionalLocalResources(tezJarResources, vertexLRs);
       }
+      if (binaryConfig != null) {
+        vertexLRs.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, binaryConfig);
+      }
+      int vertexParallelism = vertex.getParallelism();
+      VertexLocationHint vertexLocationHint = vertex.getLocationHint();
       if (dataSources.size() == 1) {
         DataSourceDescriptor dataSource = dataSources.get(0);
-        if (vertex.getParallelism() == -1 && dataSource.getNumberOfShards() > -1) {
-          vertex.setParallelism(dataSource.getNumberOfShards());
+        if (vertexParallelism == -1 && dataSource.getNumberOfShards() > -1) {
+          vertexParallelism = dataSource.getNumberOfShards();
         }
-        if (vertex.getLocationHint() == null && dataSource.getLocationHint() != null) {
-          vertex.setLocationHint(dataSource.getLocationHint());
+        if (vertexLocationHint == null && dataSource.getLocationHint() != null) {
+          vertexLocationHint = dataSource.getLocationHint();
         }
       }
+      if (vertexParallelism == -1) {
+        Preconditions.checkState(vertexLocationHint == null,
+            "Cannot specify vertex location hint without specifying vertex parallelism. Vertex: "
+                + vertex.getName());
+      } else if (vertexLocationHint != null) {
+        Preconditions.checkState(vertexParallelism == vertexLocationHint.getTaskLocationHints().size(),
+            "vertex task location hint must equal vertex parallelism. Vertex: " + vertex.getName());
+      }
       for (DataSinkDescriptor dataSink : vertex.getDataSinks()) {
         if (dataSink.getCredentials() != null) {
-          credentials.addAll(dataSink.getCredentials());
+          dagCredentials.addAll(dataSink.getCredentials());
         }
       }
       
-      // add common task files for this DAG
-      vertex.addTaskLocalFiles(commonTaskLocalFiles);
-        
       VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
       vertexBuilder.setName(vertex.getName());
       vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
@@ -665,48 +694,31 @@ public class DAG {
 
       //task config
       PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
-      Resource resource = vertex.getTaskResource();
-      taskConfigBuilder.setNumTasks(vertex.getParallelism());
-      taskConfigBuilder.setMemoryMb(resource.getMemory());
-      taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
-      taskConfigBuilder.setJavaOpts(vertex.getTaskLaunchCmdOpts());
+      taskConfigBuilder.setNumTasks(vertexParallelism);
+      taskConfigBuilder.setMemoryMb(vertexTaskResource.getMemory());
+      taskConfigBuilder.setVirtualCores(vertexTaskResource.getVirtualCores());
+      taskConfigBuilder.setJavaOpts(
+          TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), dagConf));
 
       taskConfigBuilder.setTaskModule(vertex.getName());
-      PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
-      localResourcesBuilder.clear();
-      for (Entry<String, LocalResource> entry :
-             vertex.getTaskLocalFiles().entrySet()) {
-        String key = entry.getKey();
-        LocalResource lr = entry.getValue();
-        localResourcesBuilder.setName(key);
-        localResourcesBuilder.setUri(
-          DagTypeConverters.convertToDAGPlan(lr.getResource()));
-        localResourcesBuilder.setSize(lr.getSize());
-        localResourcesBuilder.setTimeStamp(lr.getTimestamp());
-        localResourcesBuilder.setType(
-          DagTypeConverters.convertToDAGPlan(lr.getType()));
-        localResourcesBuilder.setVisibility(
-          DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
-        if (lr.getType() == LocalResourceType.PATTERN) {
-          if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
-            throw new TezUncheckedException("LocalResource type set to pattern"
-              + " but pattern is null or empty");
-          }
-          localResourcesBuilder.setPattern(lr.getPattern());
-        }
-        taskConfigBuilder.addLocalResource(localResourcesBuilder);
+      if (!vertexLRs.isEmpty()) {
+        taskConfigBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(vertexLRs));
       }
-      
-      for (String key : vertex.getTaskEnvironment().keySet()) {
+
+      Map<String, String> taskEnv = Maps.newHashMap(vertex.getTaskEnvironment());
+      TezYARNUtils.setupDefaultEnv(taskEnv, dagConf,
+          TezConfiguration.TEZ_TASK_LAUNCH_ENV,
+          TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT, tezLrsAsArchive);
+      for (Map.Entry<String, String> entry : taskEnv.entrySet()) {
         PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
-        envSettingBuilder.setKey(key);
-        envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
+        envSettingBuilder.setKey(entry.getKey());
+        envSettingBuilder.setValue(entry.getValue());
         taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
       }
 
-      if (vertex.getLocationHint() != null) {
-        if (vertex.getLocationHint().getTaskLocationHints() != null) {
-          for (TaskLocationHint hint : vertex.getLocationHint().getTaskLocationHints()) {
+      if (vertexLocationHint != null) {
+        if (vertexLocationHint.getTaskLocationHints() != null) {
+          for (TaskLocationHint hint : vertexLocationHint.getTaskLocationHints()) {
             PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
 
             if (hint.getAffinitizedContainer() != null) {
@@ -788,12 +800,11 @@ public class DAG {
     dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere
     // should this replace BINARY_PB_CONF???
 
-    if (credentials != null) {
-      dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials));
-      TezCommonUtils.logCredentials(LOG, credentials, "dag");
+    if (dagCredentials != null) {
+      dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials));
+      TezCommonUtils.logCredentials(LOG, dagCredentials, "dag");
     }
     
-    cachedDAGPlan = dagBuilder.build();
-    return cachedDAGPlan;
+    return dagBuilder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index 6594f02..179f3cc 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.Map.Entry;
 
 import javax.annotation.Nullable;
 
@@ -72,6 +73,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto;
 import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexLocationHintProto;
 
+import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ByteString.Output;
 
@@ -86,6 +88,33 @@ public class DagTypeConverters {
       default : throw new RuntimeException("unknown 'visibility': " + visibility);
     }
   }
+  
+  public static List<PlanLocalResource> convertToDAGPlan(Map<String, LocalResource> lrs) {
+    List<PlanLocalResource> planLrs = Lists.newArrayListWithCapacity(lrs.size());
+    for (Entry<String, LocalResource> entry : lrs.entrySet()) {
+      PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
+      String key = entry.getKey();
+      LocalResource lr = entry.getValue();
+      localResourcesBuilder.setName(key);
+      localResourcesBuilder.setUri(
+        DagTypeConverters.convertToDAGPlan(lr.getResource()));
+      localResourcesBuilder.setSize(lr.getSize());
+      localResourcesBuilder.setTimeStamp(lr.getTimestamp());
+      localResourcesBuilder.setType(
+        DagTypeConverters.convertToDAGPlan(lr.getType()));
+      localResourcesBuilder.setVisibility(
+        DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+      if (lr.getType() == LocalResourceType.PATTERN) {
+        if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+          throw new TezUncheckedException("LocalResource type set to pattern"
+            + " but pattern is null or empty");
+        }
+        localResourcesBuilder.setPattern(lr.getPattern());
+      }
+      planLrs.add(localResourcesBuilder.build());
+    }
+    return planLrs;
+  }
 
   public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){
     switch(visibility){

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
index 54cd191..5b1f518 100644
--- a/tez-api/src/main/proto/DAGApiRecords.proto
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -159,6 +159,7 @@ message DAGPlan {
   optional ConfigurationProto dagKeyValues = 4;
   optional bytes credentials_binary = 5;
   repeated PlanVertexGroupInfo vertex_groups = 6;
+  repeated PlanLocalResource local_resource = 7;
 }
 
 // DAG monitoring messages

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 163b042..0ce6dff 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -161,11 +161,7 @@ public class TestTezClient {
         Resource.newInstance(1, 1));
     DAG dag = DAG.create("DAG").addVertex(vertex).addTaskLocalFiles(lrDAG);
     DAGClient dagClient = client.submitDAG(dag);
-    
-    // verify that both DAG and TezClient localResources are added to the vertex
-    Map<String, LocalResource> vertexLR = vertex.getTaskLocalFiles();
-    Assert.assertTrue(vertexLR.containsKey(mockLR1Name));
-    
+        
     Assert.assertTrue(dagClient.getExecutionContext().contains(client.mockAppId.toString()));
     
     if (isSession) {

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
index 22fe3a1..8cbd611 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -123,7 +123,7 @@ public class TestDAGPlan {
 
     dag.addVertex(v1).addVertex(v2).addEdge(edge);
 
-    DAGPlan dagProto = dag.createDag(new TezConfiguration());
+    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
 
     EdgeProperty edgeProperty = DagTypeConverters.createEdgePropertyMapFromDAGPlan(dagProto
         .getEdgeList().get(0));
@@ -158,7 +158,7 @@ public class TestDAGPlan {
 
     dag.addVertex(v1).addVertex(v2).addEdge(edge);
 
-    DAGPlan dagProto = dag.createDag(new TezConfiguration());
+    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
 
     assertEquals(2, dagProto.getVertexCount());
     assertEquals(1, dagProto.getEdgeCount());
@@ -224,7 +224,7 @@ public class TestDAGPlan {
 
     dag.addVertex(v1).addVertex(v2).addEdge(edge).addVertex(v3);
 
-    DAGPlan dagProto = dag.createDag(new TezConfiguration());
+    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
 
     assertEquals(3, dagProto.getVertexCount());
     assertEquals(1, dagProto.getEdgeCount());
@@ -300,7 +300,7 @@ public class TestDAGPlan {
     
     dag.setCredentials(dagCredentials);
 
-    DAGPlan dagProto = dag.createDag(new TezConfiguration());
+    DAGPlan dagProto = dag.createDag(new TezConfiguration(), null, null, null, true);
 
     assertTrue(dagProto.hasCredentialsBinary());
     

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index ba89a47..0697584 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -19,7 +19,9 @@
 package org.apache.tez.dag.api;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.conf.Configuration;
@@ -35,10 +37,14 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 public class TestDAGVerify {
 
@@ -901,51 +907,51 @@ public class TestDAGVerify {
 
     dag.addVertex(v1);
 
-    dag.createDag(new TezConfiguration());
+    dag.createDag(new TezConfiguration(), null, null, null, true);
   }
   
   
   @Test(timeout = 5000)
-  public void testVerifyCommonFiles() {
-    Vertex v1 = Vertex.create("v1",
-        ProcessorDescriptor.create(dummyProcessorClassName),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v2 = Vertex.create("v2",
-        ProcessorDescriptor.create("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = Edge.create(v1, v2,
-        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
-            OutputDescriptor.create(dummyOutputClassName),
-            InputDescriptor.create(dummyInputClassName)));
-    Map<String, LocalResource> lrs = Maps.newHashMap();
+  public void testDAGCreateDataInference() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create(dummyProcessorClassName));
+    Map<String, LocalResource> lrs1 = Maps.newHashMap();
     String lrName1 = "LR1";
-    lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
+    lrs1.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+    Map<String, LocalResource> lrs2 = Maps.newHashMap();
+    String lrName2 = "LR2";
+    lrs2.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
         LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
     
+    Set<String> hosts = Sets.newHashSet();
+    hosts.add("h1");
+    hosts.add("h2");
+    List<TaskLocationHint> taskLocationHints = Lists.newLinkedList();
+    taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
+    taskLocationHints.add(TaskLocationHint.createTaskLocationHint(hosts, null));
+    VertexLocationHint vLoc = VertexLocationHint.create(taskLocationHints);
+    DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"), 
+        null, dummyTaskCount, null, vLoc, lrs2);
+    v1.addDataSource("i1", ds);
+        
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addEdge(e1);
-    dag.addTaskLocalFiles(lrs);
-    dag.createDag(new TezConfiguration());
-    Assert.assertTrue(v1.getTaskLocalFiles().containsKey(lrName1));
-    Assert.assertTrue(v2.getTaskLocalFiles().containsKey(lrName1));
+    dag.addTaskLocalFiles(lrs1);
+    DAGPlan dagPlan = dag.createDag(new TezConfiguration(), null, null, null, true);
+    Assert.assertEquals(lrName1, dagPlan.getLocalResource(0).getName());
+    VertexPlan vPlan = dagPlan.getVertex(0);
+    PlanTaskConfiguration taskPlan = vPlan.getTaskConfig();
+    Assert.assertEquals(dummyTaskCount, taskPlan.getNumTasks());
+    Assert.assertEquals(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT, taskPlan.getMemoryMb());
+    Assert.assertEquals(lrName2, taskPlan.getLocalResource(0).getName());
+    Assert.assertEquals(dummyTaskCount, vPlan.getTaskLocationHintCount());
   }
 
   @Test(timeout = 5000)
-  public void testVerifyCommonFilesFail() {
+  public void testInferredFilesFail() {
     Vertex v1 = Vertex.create("v1",
         ProcessorDescriptor.create(dummyProcessorClassName),
         dummyTaskCount, dummyTaskResource);
-    Vertex v2 = Vertex.create("v2",
-        ProcessorDescriptor.create("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = Edge.create(v1, v2,
-        EdgeProperty.create(DataMovementType.SCATTER_GATHER,
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
-            OutputDescriptor.create(dummyOutputClassName),
-            InputDescriptor.create(dummyInputClassName)));
     Map<String, LocalResource> lrs = Maps.newHashMap();
     String lrName1 = "LR1";
     lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
@@ -957,10 +963,13 @@ public class TestDAGVerify {
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
     }
+
+    DataSourceDescriptor ds = DataSourceDescriptor.create(InputDescriptor.create("I.class"), 
+        null, -1, null, null, lrs);
+    v1.addDataSource("i1", ds);
+    
     DAG dag = DAG.create("testDag");
     dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addEdge(e1);
     dag.addTaskLocalFiles(lrs);
     try {
       dag.addTaskLocalFiles(lrs);
@@ -969,8 +978,8 @@ public class TestDAGVerify {
       Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
     }
     try {
-      // dag will add duplicate common files to vertex
-      dag.createDag(new TezConfiguration());
+      // data source will add duplicate common files to vertex
+      dag.createDag(new TezConfiguration(), null, null, null, true);
       Assert.fail();
     } catch (TezUncheckedException e) {
       Assert.assertTrue(e.getMessage().contains("Attempting to add duplicate resource"));
@@ -993,7 +1002,7 @@ public class TestDAGVerify {
     dag.setAccessControls(dagAccessControls);
 
     Configuration conf = new Configuration(false);
-    DAGPlan dagPlan = dag.createDag(conf);
+    DAGPlan dagPlan = dag.createDag(conf, null, null, null, true);
     Assert.assertNull(conf.get(TezConstants.TEZ_DAG_VIEW_ACLS));
     Assert.assertNull(conf.get(TezConstants.TEZ_DAG_MODIFY_ACLS));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 2fae860..a2f04ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.client.DAGStatusBuilder;
 import org.apache.tez.dag.api.client.StatusGetOpts;
@@ -41,6 +42,7 @@ import org.apache.tez.dag.records.TezVertexID;
 public interface DAG {
 
   TezDAGID getID();
+  Map<String, LocalResource> getLocalResources();
   String getName();
   DAGState getState();
   DAGReport getReport();

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 23f9096..c4e16e2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -167,6 +168,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
 
   public final Configuration conf;
   private final DAGPlan jobPlan;
+  
+  Map<String, LocalResource> localResources;
 
   private final List<String> diagnostics = new ArrayList<String>();
 
@@ -421,6 +424,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
+    
+    this.localResources = DagTypeConverters.createLocalResourceMapFromDAGPlan(jobPlan
+        .getLocalResourceList());
 
     this.credentials = dagCredentials;
     if (this.credentials == null) {
@@ -452,6 +458,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   public TezDAGID getID() {
     return dagId;
   }
+  
+  @Override
+  public Map<String, LocalResource> getLocalResources() {
+    return localResources;
+  }
 
   // TODO maybe removed after TEZ-74
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 5eb1e67..b776349 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -90,11 +90,7 @@ public class AMContainerHelpers {
    */
   private static ContainerLaunchContext createCommonContainerLaunchContext(
       Map<ApplicationAccessType, String> applicationACLs,
-      Credentials credentials) {
-
-    // Application resources
-    Map<String, LocalResource> localResources =
-        new HashMap<String, LocalResource>();
+      Credentials credentials, Map<String, LocalResource> localResources) {
 
     // Application environment
     Map<String, String> environment = new HashMap<String, String>();
@@ -141,6 +137,7 @@ public class AMContainerHelpers {
   @VisibleForTesting
   public static ContainerLaunchContext createContainerLaunchContext(
       TezDAGID tezDAGID,
+      Map<String, LocalResource> commonDAGLRs,
       Map<ApplicationAccessType, String> acls,
       ContainerId containerId,
       Map<String, LocalResource> localResources,
@@ -154,7 +151,7 @@ public class AMContainerHelpers {
     synchronized (commonContainerSpecLock) {
       if (!commonContainerSpecs.containsKey(tezDAGID)) {
         commonContainerSpec =
-            createCommonContainerLaunchContext(acls, credentials);
+            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs);
         commonContainerSpecs.put(tezDAGID, commonContainerSpec);
       } else {
         commonContainerSpec = commonContainerSpecs.get(tezDAGID);

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index df1b65d..a0f9cb7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -496,8 +496,14 @@ public class AMContainerImpl implements AMContainer {
       container.credentials = containerContext.getCredentials();
       container.credentialsChanged = true;
 
+      TezDAGID dagId = null;
+      Map<String, LocalResource> dagLocalResources = null;
+      if (container.appContext.getCurrentDAG() != null) {
+        dagId = container.appContext.getCurrentDAG().getID();
+        dagLocalResources = container.appContext.getCurrentDAG().getLocalResources();
+      }
       ContainerLaunchContext clc = AMContainerHelpers.createContainerLaunchContext(
-          container.appContext.getCurrentDAGID(),
+          dagId, dagLocalResources,
           container.appContext.getApplicationACLs(),
           container.getContainerId(),
           containerContext.getLocalResources(),

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index 9fe9c4d..7c7e091 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.ContainerContext;
@@ -84,16 +85,19 @@ public class MockDAGAppMaster extends DAGAppMaster {
       ContainerId cId;
       TezTaskAttemptID taId;
       String vName;
+      ContainerLaunchContext launchContext;
       boolean completed;
       
-      public ContainerData(ContainerId cId) {
+      public ContainerData(ContainerId cId, ContainerLaunchContext context) {
         this.cId = cId;
+        this.launchContext = context;
       }
       
       void clear() {
         taId = null;
         vName = null;
         completed = false;
+        launchContext = null;
       }
     }
     
@@ -126,6 +130,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
     
     
     void waitToGo() {
+      if (goFlag == null) {
+        return;
+      }
       synchronized (goFlag) {
         goFlag.set(true);
         goFlag.notify();
@@ -164,9 +171,16 @@ public class MockDAGAppMaster extends DAGAppMaster {
 
     void launch(NMCommunicatorLaunchRequestEvent event) {
       // launch container by putting it in simulated container list
-      containers.put(event.getContainerId(), new ContainerData(event.getContainerId()));
+      containers.put(event.getContainerId(), new ContainerData(event.getContainerId(), 
+          event.getContainerLaunchContext()));
       getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId()));      
     }
+    
+    public void waitTillContainersLaunched() throws InterruptedException {
+      while (containers.isEmpty()) {
+        Thread.sleep(50);
+      }
+    }
 
     @Override
     public void run() {

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
new file mode 100644
index 0000000..1bab0d2
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -0,0 +1,138 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
+import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("deprecation")
+public class TestMockDAGAppMaster {
+  static Configuration defaultConf;
+  static FileSystem localFs;
+  static Path workDir;
+  
+  static {
+    try {
+      defaultConf = new Configuration(false);
+      defaultConf.set("fs.defaultFS", "file:///");
+      defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+      localFs = FileSystem.getLocal(defaultConf);
+      workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")),
+          "TestDAGAppMaster").makeQualified(localFs);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+  
+  @Test (timeout = 5000)
+  public void testLocalResourceSetup() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+    tezClient.start();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    
+    Map<String, LocalResource> lrDAG = Maps.newHashMap();
+    String lrName1 = "LR1";
+    lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+    Map<String, LocalResource> lrVertex = Maps.newHashMap();
+    String lrName2 = "LR2";
+    lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+
+    DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
+    dag.addVertex(vA);
+
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    ContainerData cData = mockLauncher.getContainers().values().iterator().next();
+    ContainerLaunchContext launchContext = cData.launchContext;
+    Map<String, LocalResource> taskLR = launchContext.getLocalResources();
+    // verify tasks are launched with both DAG and task resources.
+    Assert.assertTrue(taskLR.containsKey(lrName1));
+    Assert.assertTrue(taskLR.containsKey(lrName2));
+    
+    mockLauncher.startScheduling(true);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    tezClient.stop();
+  }
+  
+  @Test (timeout = 10000)
+  public void testMultipleSubmissions() throws Exception {
+    Map<String, LocalResource> lrDAG = Maps.newHashMap();
+    String lrName1 = "LR1";
+    lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+    Map<String, LocalResource> lrVertex = Maps.newHashMap();
+    String lrName2 = "LR2";
+    lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
+        LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
+
+    DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
+    dag.addVertex(vA);
+
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+    tezClient.start();
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    tezClient.stop();
+    
+    // submit the same DAG again to verify it can be done.
+    tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+    tezClient.start();
+    dagClient = tezClient.submitDAG(dag);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    tezClient.stop();
+
+  }
+  
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 03aedef..2d7cf65 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -378,7 +378,7 @@ public class TestDAGImpl {
     dag.addVertex(v2);
     dag.addVertex(v3);
     dag.addEdge(e1);
-    return dag.createDag(conf);
+    return dag.createDag(conf, null, null, null, true);
   }
 
   public static DAGPlan createTestDAGPlan() {

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 5c00fec..a1b9847 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -429,7 +429,7 @@ public class TestVertexImpl {
             SchedulingType.SEQUENTIAL, OutputDescriptor.create("out.class"),
             InputDescriptor.create("out.class"))));
    
-    return dag.createDag(conf);
+    return dag.createDag(conf, null, null, null, true);
   }
 
   private DAGPlan createDAGPlanWithInitializer0Tasks(String initializerClassName) {

http://git-wip-us.apache.org/repos/asf/tez/blob/06fa79ae/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
index 081ed10..0be67ad 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java
@@ -87,7 +87,7 @@ public class TestDAGUtils {
     dag.addVertex(v2);
     dag.addVertex(v3);
     dag.addEdge(e1);
-    return dag.createDag(conf);
+    return dag.createDag(conf, null, null, null, true);
   }
 
   @Test