You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/04/22 20:29:54 UTC

svn commit: r1470654 - in /incubator/tez/branches/TEZ-1: tez-dag-api/src/main/java/org/apache/tez/dag/api/ tez-dag/src/main/java/org/apache/hadoop/mapred/ tez-dag/src/main/java/org/apache/tez/dag/api/impl/ tez-dag/src/main/java/org/apache/tez/dag/app/ ...

Author: sseth
Date: Mon Apr 22 18:29:54 2013
New Revision: 1470654

URL: http://svn.apache.org/r1470654
Log:
TEZ-55. Remove MR references rom DAGAppMaster. (sseth)

Added:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
Modified:
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java Mon Apr 22 18:29:54 2013
@@ -58,7 +58,7 @@ public class DAG { // FIXME rename to To
     
     edges.add(edge);
   }
-  
+
   public void verify() throws TezException { // FIXME better exception
 
     //FIXME are task resources compulsory or will the DAG AM put in a default

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java Mon Apr 22 18:29:54 2013
@@ -60,6 +60,8 @@ public class DAGConfiguration extends Co
   }
 
   public final static String DAG = "tez.dag.";
+  
+  public final static String DAG_AM = DAG + "am.";
 
   public final static String VERTEX = DAG + "vertex.";
 
@@ -75,6 +77,23 @@ public class DAGConfiguration extends Co
 
   private final static String SEPARATOR = "|";
 
+  // FIXME This property onwards should be split into a separate class or the
+  // rest of thie class needs to be converted into a config name list once 
+  // the serialization is changed.
+  
+  // TODO Should not be required once all tokens are handled via AppSubmissionContext
+  public static final String JOB_SUBMIT_DIR = DAG + "jobSubmitDir";
+  public static final String APPLICATION_TOKENS_FILE = "appTokens";
+  
+  public static final String JOB_NAME = DAG + "job.name";
+  public static final String USER_NAME = DAG + "user.name";
+  
+  // TODO Speculator class should be configurable on a pere vertex level.
+  public static final String DAG_AM_SPECULATOR_CLASS = DAG_AM + "speculator.class";
+  
+  public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT = DAG_AM + "task.listener.thread-count";
+  public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
+  
   @Private
   public void setEdgeProperties(List<Edge> edges) {
     String[] edgeIds = new String[edges.size()];

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/hadoop/mapred/MRVertexOutputCommitter.java Mon Apr 22 18:29:54 2013
@@ -107,6 +107,10 @@ public class MRVertexOutputCommitter ext
 
   @Override
   public void init(VertexContext context) throws IOException {
+    // TODO VertexContext not the best way to get ApplicationAttemptId. No
+    // alternates rightnow.
+    context.getConf().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+        context.getApplicationAttemptId().getAttemptId());
     committer = getOutputCommitter(context);
     jobContext = getJobContextFromVertexContext(context);
     initialized = true;

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/api/impl/VertexContext.java Mon Apr 22 18:29:54 2013
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.api.impl;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.tez.engine.records.TezDAGID;
 import org.apache.tez.engine.records.TezVertexID;
 
@@ -27,6 +28,9 @@ public interface VertexContext {
   public Configuration getConf();
 
   public TezDAGID getDAGId();
+  
+  // TODO Get rid of this as part of VertexContext cleanup.
+  public ApplicationAttemptId getApplicationAttemptId();
 
   public TezVertexID getVertexId();
 

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java Mon Apr 22 18:29:54 2013
@@ -23,9 +23,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -38,7 +36,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -55,9 +53,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -66,14 +61,10 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
-import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.DAGConfiguration;
 import org.apache.tez.dag.api.DAGLocationHint;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.app.client.ClientService;
 import org.apache.tez.dag.app.client.impl.TezClientService;
 import org.apache.tez.dag.app.dag.DAG;
@@ -104,7 +95,6 @@ import org.apache.tez.dag.app.rm.RMConta
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.AMContainerState;
 import org.apache.tez.dag.app.rm.node.AMNodeEventType;
@@ -116,7 +106,6 @@ import org.apache.tez.dag.app.taskclean.
 import org.apache.tez.dag.app.taskclean.TaskCleanerImpl;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
 import org.apache.tez.engine.records.TezDAGID;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
 /**
  * The Map-Reduce Application Master.
@@ -227,9 +216,7 @@ public class DAGAppMaster extends Compos
 
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
-    appName = conf.get(MRJobConfig.JOB_NAME, "<missing app name>");
-
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptID.getAttemptId());
+    appName = conf.get(DAGConfiguration.JOB_NAME, "<missing app name>");
 
     dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
 
@@ -664,10 +651,10 @@ public class DAGAppMaster extends Compos
         // Read the file-system tokens from the localized tokens-file.
         Path jobSubmitDir =
             FileContext.getLocalFSFileContext().makeQualified(
-                new Path(new File(MRJobConfig.JOB_SUBMIT_DIR)
+                new Path(new File(DAGConfiguration.JOB_SUBMIT_DIR)
                     .getAbsolutePath()));
         Path jobTokenFile =
-            new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+            new Path(jobSubmitDir, DAGConfiguration.APPLICATION_TOKENS_FILE);
         fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
         LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
             + jobTokenFile);
@@ -727,7 +714,7 @@ public class DAGAppMaster extends Compos
     try {
       speculatorClass
           // "yarn.mapreduce.job.speculator.class"
-          = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+          = conf.getClass(DAGConfiguration.DAG_AM_SPECULATOR_CLASS,
                           DefaultSpeculator.class,
                           Speculator.class);
       Constructor<? extends Speculator> speculatorConstructor
@@ -738,19 +725,19 @@ public class DAGAppMaster extends Compos
       return result;
     } catch (InstantiationException ex) {
       LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
       throw new YarnException(ex);
     } catch (IllegalAccessException ex) {
       LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
       throw new YarnException(ex);
     } catch (InvocationTargetException ex) {
       LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
       throw new YarnException(ex);
     } catch (NoSuchMethodException ex) {
       LOG.error("Can't make a speculator -- check "
-          + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
       throw new YarnException(ex);
     }
   }
@@ -765,16 +752,16 @@ public class DAGAppMaster extends Compos
   protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
       Configuration conf) {
     TaskHeartbeatHandler thh = new TaskHeartbeatHandler(context, conf.getInt(
-        MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
-        MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
+        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
     return thh;
   }
 
   protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
       Configuration conf) {
     ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
-        MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
-        MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT));
+        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
+        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
     // TODO XXX: Define a CONTAINER_LISTENER_THREAD_COUNT
     return chh;
   }
@@ -1027,7 +1014,7 @@ public class DAGAppMaster extends Compos
 
     @Override
     public String getUser() {
-      return this.conf.get(MRJobConfig.USER_NAME);
+      return this.conf.get(DAGConfiguration.USER_NAME);
     }
 
     @Override
@@ -1294,9 +1281,9 @@ public class DAGAppMaster extends Compos
         LOG.info("Running job type: " + type);
 
         if (type.equals("mr")) {
-          dagConf = (DAGConfiguration)createDAGConfigurationForMR();
+          dagConf = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMR();
         } else if (type.equals("mrr")) {
-          dagConf = (DAGConfiguration)createDAGConfigurationForMRR();
+          dagConf = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMRR();
         }
       } else {
         dagConf = new DAGConfiguration();
@@ -1317,7 +1304,8 @@ public class DAGAppMaster extends Compos
       // the objects myself.
       dagConf.setBoolean("fs.automatic.close", false);
 
-      // TODO TEZ HACK - user name in DAGConfiguration
+      dagConf.set(DAGConfiguration.USER_NAME, jobUserName);
+      // TODO Remove after fixing TaskLanch JVM construction
       dagConf.set(MRJobConfig.USER_NAME, jobUserName);
 
       initAndStartAppMaster(appMaster, new YarnConfiguration(dagConf),
@@ -1338,163 +1326,7 @@ public class DAGAppMaster extends Compos
     return opts;
   }
 
-   //TODO remove once client is in place
-  private static Path getMRBaseDir() throws IOException {
-    Path basePath = MRApps.getStagingAreaDir(new Configuration(),
-        UserGroupInformation.getCurrentUser().getShortUserName());
-    return new Path(basePath, "dagTest");
-  }
-
-  private static Path getMRRBaseDir() throws IOException {
-    Path basePath = MRApps.getStagingAreaDir(new Configuration(),
-        UserGroupInformation.getCurrentUser().getShortUserName());
-    return new Path(basePath, "mrrTest");
-  }
-
-  private static String getConfFileName(String vertexName) {
-    return MRJobConfig.JOB_CONF_FILE + "_" + vertexName;
-  }
-
-  // TODO remove once client is in place
-  private static Map<String, LocalResource> createLocalResources(
-      Path remoteBaseDir, String[] resourceNames) throws IOException {
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(conf);
-
-    Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
-
-    for (String resourceName : resourceNames) {
-      Path remoteFile = new Path(remoteBaseDir, resourceName);
-      localResources.put(resourceName, AMContainerHelpers.createLocalResource(
-          fs, remoteFile, LocalResourceType.FILE,
-          LocalResourceVisibility.APPLICATION));
-      LOG.info("Localizing file " + resourceName + " from location "
-          + remoteFile.toString());
-    }
-    return localResources;
-  }
-
-
-  private static String[] getMRLocalRsrcList() {
-    String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
-        MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
-        MRJobConfig.JOB_CONF_FILE };
-    return resourceNames;
-  }
-
-  private static String[] getMRRLocalRsrcList() {
-    String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
-        MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
-        MRJobConfig.JOB_CONF_FILE, getConfFileName("reduce1"),
-        getConfFileName("reduce2") };
-    return resourceNames;
-  }
-
-  private static Configuration createDAGConfigurationForMRR() throws IOException {
-    org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
-    Vertex mapVertex = new Vertex("map",
-        "org.apache.tez.mapreduce.task.InitialTask", 6);
-    Vertex reduce1Vertex = new Vertex("reduce1",
-        "org.apache.tez.mapreduce.task.IntermediateTask", 3);
-    Vertex reduce2Vertex = new Vertex("reduce2",
-        "org.apache.tez.mapreduce.task.FinalTask", 3);
-    Edge edge1 = new Edge(mapVertex, reduce1Vertex, new EdgeProperty());
-    Edge edge2 = new Edge(reduce1Vertex, reduce2Vertex, new EdgeProperty());
-    Map<String, LocalResource> jobRsrcs = createLocalResources(getMRRBaseDir(),
-        getMRRLocalRsrcList());
-
-    Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
-    Map<String, LocalResource> reduce1Rsrcs = new HashMap<String, LocalResource>();
-    Map<String, LocalResource> reduce2Rsrcs = new HashMap<String, LocalResource>();
-
-    mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
-    mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
-    mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
-    mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-    mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-
-    reduce1Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
-    reduce1Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-    reduce1Rsrcs.put(getConfFileName("reduce1"), jobRsrcs.get(getConfFileName("reduce1")));
-
-    reduce2Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
-    reduce2Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-    reduce2Rsrcs.put(getConfFileName("reduce2"), jobRsrcs.get(getConfFileName("reduce2")));
-
-    Resource mapResource = BuilderUtils.newResource(
-         MRJobConfig.DEFAULT_MAP_MEMORY_MB,
-         MRJobConfig.DEFAULT_MAP_CPU_VCORES);
-    mapVertex.setTaskResource(mapResource);
-    mapVertex.setTaskLocalResources(mapRsrcs);
-    Resource reduceResource = BuilderUtils.newResource(
-        MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
-        MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
-    reduce1Vertex.setTaskResource(reduceResource);
-    reduce1Vertex.setTaskLocalResources(reduce1Rsrcs);
-
-    reduce1Vertex.setTaskResource(reduceResource);
-    reduce2Vertex.setTaskLocalResources(reduce2Rsrcs);
-
-    dag.addVertex(mapVertex);
-    dag.addVertex(reduce1Vertex);
-    dag.addVertex(reduce2Vertex);
-    dag.addEdge(edge1);
-    dag.addEdge(edge2);
-    dag.verify();
-    DAGConfiguration dagConf = dag.serializeDag();
-
-    dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
-    dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
-
-    return dagConf;
-  }
-
-  // TODO remove once client is in place
-  private static Configuration createDAGConfigurationForMR() throws IOException {
-    org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
-    Vertex mapVertex = new Vertex("map",
-        "org.apache.tez.mapreduce.task.InitialTask", 6);
-    Vertex reduceVertex = new Vertex("reduce",
-        "org.apache.tez.mapreduce.task.FinalTask", 1);
-    Edge edge = new Edge(mapVertex, reduceVertex, new EdgeProperty());
-
-    Map<String, LocalResource> jobRsrcs = createLocalResources(getMRBaseDir(),
-        getMRLocalRsrcList());
-
-    Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
-    Map<String, LocalResource> reduceRsrcs = new HashMap<String, LocalResource>();
-
-    mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
-    mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
-    mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
-    mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-    mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-
-    reduceRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
-    reduceRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-    reduceRsrcs.put(getConfFileName("reduce"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
-
-    Resource mapResource = BuilderUtils.newResource(
-         MRJobConfig.DEFAULT_MAP_MEMORY_MB,
-         MRJobConfig.DEFAULT_MAP_CPU_VCORES);
-    mapVertex.setTaskResource(mapResource);
-    mapVertex.setTaskLocalResources(mapRsrcs);
-    Resource reduceResource = BuilderUtils.newResource(
-        MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
-        MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
-    reduceVertex.setTaskResource(reduceResource);
-    reduceVertex.setTaskLocalResources(reduceRsrcs);
-    dag.addVertex(mapVertex);
-    dag.addVertex(reduceVertex);
-    dag.addEdge(edge);
-    dag.verify();
-    DAGConfiguration dagConf = dag.serializeDag();
-
-    dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
-    dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
 
-    return dagConf;
-  }
 
   // The shutdown hook that runs when a signal is received AND during normal
   // close of the JVM.

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java?rev=1470654&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/MRRExampleHelper.java Mon Apr 22 18:29:54 2013
@@ -0,0 +1,189 @@
+package org.apache.tez.dag.app;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.app.rm.container.AMContainerHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class MRRExampleHelper {
+
+  private static final Log LOG = LogFactory.getLog(MRRExampleHelper.class);
+  
+  //TODO remove once client is in place
+ private static Path getMRBaseDir() throws IOException {
+   Path basePath = MRApps.getStagingAreaDir(new Configuration(),
+       UserGroupInformation.getCurrentUser().getShortUserName());
+   return new Path(basePath, "dagTest");
+ }
+
+ private static Path getMRRBaseDir() throws IOException {
+   Path basePath = MRApps.getStagingAreaDir(new Configuration(),
+       UserGroupInformation.getCurrentUser().getShortUserName());
+   return new Path(basePath, "mrrTest");
+ }
+
+ private static String getConfFileName(String vertexName) {
+   return MRJobConfig.JOB_CONF_FILE + "_" + vertexName;
+ }
+
+ // TODO remove once client is in place
+ private static Map<String, LocalResource> createLocalResources(
+     Path remoteBaseDir, String[] resourceNames) throws IOException {
+   Configuration conf = new Configuration();
+   FileSystem fs = FileSystem.get(conf);
+
+   Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
+
+   for (String resourceName : resourceNames) {
+     Path remoteFile = new Path(remoteBaseDir, resourceName);
+     localResources.put(resourceName, AMContainerHelpers.createLocalResource(
+         fs, remoteFile, LocalResourceType.FILE,
+         LocalResourceVisibility.APPLICATION));
+     LOG.info("Localizing file " + resourceName + " from location "
+         + remoteFile.toString());
+   }
+   return localResources;
+ }
+
+
+ private static String[] getMRLocalRsrcList() {
+   String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
+       MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
+       MRJobConfig.JOB_CONF_FILE };
+   return resourceNames;
+ }
+
+ private static String[] getMRRLocalRsrcList() {
+   String[] resourceNames = new String[] { MRJobConfig.JOB_JAR,
+       MRJobConfig.JOB_SPLIT, MRJobConfig.JOB_SPLIT_METAINFO,
+       MRJobConfig.JOB_CONF_FILE, getConfFileName("reduce1"),
+       getConfFileName("reduce2") };
+   return resourceNames;
+ }
+
+ static Configuration createDAGConfigurationForMRR() throws IOException {
+   org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
+   Vertex mapVertex = new Vertex("map",
+       "org.apache.tez.mapreduce.task.InitialTask", 6);
+   Vertex reduce1Vertex = new Vertex("reduce1",
+       "org.apache.tez.mapreduce.task.IntermediateTask", 3);
+   Vertex reduce2Vertex = new Vertex("reduce2",
+       "org.apache.tez.mapreduce.task.FinalTask", 3);
+   Edge edge1 = new Edge(mapVertex, reduce1Vertex, new EdgeProperty());
+   Edge edge2 = new Edge(reduce1Vertex, reduce2Vertex, new EdgeProperty());
+   Map<String, LocalResource> jobRsrcs = createLocalResources(getMRRBaseDir(),
+       getMRRLocalRsrcList());
+
+   Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
+   Map<String, LocalResource> reduce1Rsrcs = new HashMap<String, LocalResource>();
+   Map<String, LocalResource> reduce2Rsrcs = new HashMap<String, LocalResource>();
+
+   mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
+   mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
+   mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+   mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+   mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+   reduce1Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+   reduce1Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+   reduce1Rsrcs.put(getConfFileName("reduce1"), jobRsrcs.get(getConfFileName("reduce1")));
+
+   reduce2Rsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+   reduce2Rsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+   reduce2Rsrcs.put(getConfFileName("reduce2"), jobRsrcs.get(getConfFileName("reduce2")));
+
+   Resource mapResource = BuilderUtils.newResource(
+        MRJobConfig.DEFAULT_MAP_MEMORY_MB,
+        MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+   mapVertex.setTaskResource(mapResource);
+   mapVertex.setTaskLocalResources(mapRsrcs);
+   Resource reduceResource = BuilderUtils.newResource(
+       MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
+       MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+   reduce1Vertex.setTaskResource(reduceResource);
+   reduce1Vertex.setTaskLocalResources(reduce1Rsrcs);
+
+   reduce1Vertex.setTaskResource(reduceResource);
+   reduce2Vertex.setTaskLocalResources(reduce2Rsrcs);
+
+   dag.addVertex(mapVertex);
+   dag.addVertex(reduce1Vertex);
+   dag.addVertex(reduce2Vertex);
+   dag.addEdge(edge1);
+   dag.addEdge(edge2);
+   dag.verify();
+   DAGConfiguration dagConf = dag.serializeDag();
+
+   dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+   dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+   return dagConf;
+ }
+
+ // TODO remove once client is in place
+ static Configuration createDAGConfigurationForMR() throws IOException {
+   org.apache.tez.dag.api.DAG dag = new org.apache.tez.dag.api.DAG();
+   Vertex mapVertex = new Vertex("map",
+       "org.apache.tez.mapreduce.task.InitialTask", 6);
+   Vertex reduceVertex = new Vertex("reduce",
+       "org.apache.tez.mapreduce.task.FinalTask", 1);
+   Edge edge = new Edge(mapVertex, reduceVertex, new EdgeProperty());
+
+   Map<String, LocalResource> jobRsrcs = createLocalResources(getMRBaseDir(),
+       getMRLocalRsrcList());
+
+   Map<String, LocalResource> mapRsrcs = new HashMap<String, LocalResource>();
+   Map<String, LocalResource> reduceRsrcs = new HashMap<String, LocalResource>();
+
+   mapRsrcs.put(MRJobConfig.JOB_SPLIT, jobRsrcs.get(MRJobConfig.JOB_SPLIT));
+   mapRsrcs.put(MRJobConfig.JOB_SPLIT_METAINFO, jobRsrcs.get(MRJobConfig.JOB_SPLIT_METAINFO));
+   mapRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+   mapRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+   mapRsrcs.put(getConfFileName("map"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+   reduceRsrcs.put(MRJobConfig.JOB_JAR, jobRsrcs.get(MRJobConfig.JOB_JAR));
+   reduceRsrcs.put(MRJobConfig.JOB_CONF_FILE, jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+   reduceRsrcs.put(getConfFileName("reduce"), jobRsrcs.get(MRJobConfig.JOB_CONF_FILE));
+
+   Resource mapResource = BuilderUtils.newResource(
+        MRJobConfig.DEFAULT_MAP_MEMORY_MB,
+        MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+   mapVertex.setTaskResource(mapResource);
+   mapVertex.setTaskLocalResources(mapRsrcs);
+   Resource reduceResource = BuilderUtils.newResource(
+       MRJobConfig.DEFAULT_REDUCE_MEMORY_MB,
+       MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+   reduceVertex.setTaskResource(reduceResource);
+   reduceVertex.setTaskLocalResources(reduceRsrcs);
+   dag.addVertex(mapVertex);
+   dag.addVertex(reduceVertex);
+   dag.addEdge(edge);
+   dag.verify();
+   DAGConfiguration dagConf = dag.serializeDag();
+
+   dagConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+   dagConf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+   return dagConf;
+ }
+  
+}

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Mon Apr 22 18:29:54 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -1273,6 +1274,11 @@ public class VertexImpl implements org.a
   public TezDAGID getDAGId() {
     return appContext.getDAGID();
   }
+  
+  @Override
+  public ApplicationAttemptId getApplicationAttemptId() {
+    return appContext.getApplicationAttemptId();
+  }
 
   public Resource getTaskResource() {
     return taskResource;
@@ -1315,4 +1321,5 @@ public class VertexImpl implements org.a
     }
     return outputSpecList;
   }
+
 }

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/pom.xml Mon Apr 22 18:29:54 2013
@@ -34,6 +34,10 @@
       <artifactId>tez-engine</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag-api</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
     </dependency>

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Mon Apr 22 18:29:54 2013
@@ -18,13 +18,39 @@
 
 package org.apache.tez.mapreduce.hadoop;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.DAGConfiguration;
+
 
 
 public class DeprecatedKeys {
+
+  // This could be done via deprecation.
+  private static Map<String, String> mrParamToDAGParamMap = new HashMap<String, String>();
+
+  public static Map<String, String> getMRToDAGParamMap() {
+    return Collections.unmodifiableMap(mrParamToDAGParamMap);
+  }
+ 
   static {
     addDeprecatedKeys();
+    
+    mrParamToDAGParamMap.put(MRJobConfig.JOB_SUBMIT_DIR, DAGConfiguration.JOB_SUBMIT_DIR);
+    mrParamToDAGParamMap.put(MRJobConfig.APPLICATION_TOKENS_FILE, DAGConfiguration.APPLICATION_TOKENS_FILE);
+    
+    mrParamToDAGParamMap.put(MRJobConfig.JOB_NAME, DAGConfiguration.JOB_NAME);
+    
+    mrParamToDAGParamMap.put(MRJobConfig.MR_AM_JOB_SPECULATOR, DAGConfiguration.DAG_AM_SPECULATOR_CLASS);
+    
+    // TODO Default value handling.
+    mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT, DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT);
+    
+    mrParamToDAGParamMap.put(MRJobConfig.USER_NAME, DAGConfiguration.USER_NAME);
   }
 
   // TODO TEZAM4 Sometime, make sure this gets loaded by default. Insteaf of the current initialization in MRAppMaster, TezChild.
@@ -89,7 +115,7 @@ public class DeprecatedKeys {
     
     _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
   }
-  
+
   private static void _(String oldKey, String newKey) {
     Configuration.addDeprecation(oldKey, newKey);
   }

Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1470654&r1=1470653&r2=1470654&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Mon Apr 22 18:29:54 2013
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 import java.util.Vector;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,9 +60,9 @@ import org.apache.tez.dag.api.EdgeProper
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.zookeeper.Environment.Entry;
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
 import org.apache.hadoop.mapreduce.QueueInfo;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -681,6 +682,17 @@ public class YARNRunner implements Clien
     }
   }
 
+  private void setDAGParamsFromMRConf(DAGConfiguration dagConf) {
+    Configuration mrConf = this.conf;
+    Map<String, String> mrParamToDAGParamMap = DeprecatedKeys.getMRToDAGParamMap();
+    for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+      if (mrConf.get(entry.getKey()) != null) {
+        LOG.info("DEBUG: MR->DAG Setting new key: " + entry.getValue());
+        dagConf.set(entry.getValue(), mrConf.get(entry.getKey()));
+      }
+    }
+  }
+
   private ApplicationSubmissionContext createApplicationSubmissionContext(
       FileSystem fs, DAG dag,
       Configuration jobConf, String jobSubmitDir, Credentials ts,
@@ -766,7 +778,8 @@ public class YARNRunner implements Clien
 
     // FIXME add serialized dag conf
     DAGConfiguration dagConf = dag.serializeDag();
-
+    setDAGParamsFromMRConf(dagConf);
+    
     Path dagConfFilePath = new Path(jobSubmitDir,
         TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
     FSDataOutputStream dagConfOut =