You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/07/10 23:47:06 UTC

git commit: TEZ-291. Change MRRSleepJob to use tez apis. (hitesh)

Updated Branches:
  refs/heads/master 999d16ae1 -> b239ebd78


TEZ-291. Change MRRSleepJob to use tez apis. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b239ebd7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b239ebd7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b239ebd7

Branch: refs/heads/master
Commit: b239ebd7869f563f65b4c9a581c2b7667a3cf767
Parents: 999d16a
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Jul 10 14:46:34 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Jul 10 14:46:34 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/client/TezClient.java   |  53 ++--
 .../apache/tez/dag/api/client/DAGStatus.java    |  24 +-
 .../tez/mapreduce/examples/MRRSleepJob.java     | 285 ++++++++++++++++++-
 .../tez/mapreduce/ResourceMgrDelegate.java      |   8 +-
 4 files changed, 328 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b239ebd7/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
index 0147cbb..0569afa 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/client/TezClient.java
@@ -79,14 +79,14 @@ public class TezClient {
 
   final public static FsPermission TEZ_AM_DIR_PERMISSION =
       FsPermission.createImmutable((short) 0700); // rwx--------
-  final public static FsPermission TEZ_AM_FILE_PERMISSION = 
+  final public static FsPermission TEZ_AM_FILE_PERMISSION =
       FsPermission.createImmutable((short) 0644); // rw-r--r--
-  
+
   public static final int UTF8_CHUNK_SIZE = 16 * 1024;
-    
+
   private final TezConfiguration conf;
   private YarnClient yarnClient;
-  
+
   /**
    * <p>
    * Create an instance of the TezClient which will be used to communicate with
@@ -96,7 +96,7 @@ public class TezClient {
    * Separate instances of TezClient should be created to communicate with
    * different instances of YARN
    * </p>
-   * 
+   *
    * @param conf
    *          the configuration which will be used to establish which YARN or
    *          Tez service instance this client is associated with.
@@ -107,12 +107,12 @@ public class TezClient {
     yarnClient.init(new YarnConfiguration(conf));
     yarnClient.start();
   }
-  
+
   /**
    * Submit a Tez DAG to YARN as an application. The job will be submitted to
    * the yarn cluster or tez service which was specified when creating this
    * {@link TezClient} instance.
-   * 
+   *
    * @param dag
    *          <code>DAG</code> to be submitted
    * @param appStagingDir
@@ -149,7 +149,7 @@ public class TezClient {
    * Submit a Tez DAG to YARN with known <code>ApplicationId</code>. This is a
    * private method and is only meant to be used within Tez for MR client
    * backward compatibility.
-   * 
+   *
    * @param appId
    *          - <code>ApplicationId</code> to be used
    * @param dag
@@ -192,7 +192,7 @@ public class TezClient {
 
     return getDAGClient(appId);
   }
-  
+
   /**
    * Create a new YARN application
    * @return <code>ApplicationId</code> for the new YARN application
@@ -209,7 +209,7 @@ public class TezClient {
   }
 
   @Private
-  public DAGClient getDAGClient(ApplicationId appId) 
+  public DAGClient getDAGClient(ApplicationId appId)
       throws IOException, TezException {
       return new DAGClientRPCImpl(appId, getDefaultTezDAGID(appId), conf);
   }
@@ -224,7 +224,7 @@ public class TezClient {
     vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0);
     vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
   }
-  
+
   public FileSystem ensureExists(Path stagingArea)
       throws IOException {
     FileSystem fs = stagingArea.getFileSystem(conf);
@@ -254,7 +254,7 @@ public class TezClient {
     }
     return fs;
   }
-  
+
   private LocalResource createApplicationResource(FileSystem fs, Path p,
       LocalResourceType type) throws IOException {
     LocalResource rsrc = Records.newRecord(LocalResource.class);
@@ -364,12 +364,12 @@ public class TezClient {
       return conf;
     }
   }
-  
+
   private ApplicationSubmissionContext createApplicationSubmissionContext(
       ApplicationId appId, DAG dag, Path appStagingDir, Credentials ts,
       String amQueueName, String amName, List<String> amArgs,
       Map<String, String> amEnv, Map<String, LocalResource> amLocalResources,
-      TezConfiguration amConf) throws IOException, YarnException {    
+      TezConfiguration amConf) throws IOException, YarnException {
 
     if (amConf == null) {
       amConf = new TezConfiguration();
@@ -403,8 +403,10 @@ public class TezClient {
     String amLogLevel = conf.get(TezConfiguration.TEZ_AM_LOG_LEVEL,
                                  TezConfiguration.DEFAULT_TEZ_AM_LOG_LEVEL);
     addLog4jSystemProperties(amLogLevel, vargs);
-    
-    vargs.addAll(amArgs);
+
+    if (amArgs != null) {
+      vargs.addAll(amArgs);
+    }
 
     vargs.add(TezConfiguration.TEZ_APPLICATION_MASTER_CLASS);
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
@@ -453,14 +455,19 @@ public class TezClient {
         Environment.CLASSPATH.name(),
         Environment.PWD.$() + File.separator + "*");
 
-    for (Map.Entry<String, String> entry : amEnv.entrySet()) {
-      Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
+    if (amEnv != null) {
+      for (Map.Entry<String, String> entry : amEnv.entrySet()) {
+        Apps.addToEnvironment(environment, entry.getKey(), entry.getValue());
+      }
     }
 
     Map<String, LocalResource> localResources =
         new TreeMap<String, LocalResource>();
 
-    localResources.putAll(amLocalResources);
+    if (amLocalResources != null) {
+      localResources.putAll(amLocalResources);
+    }
+
     Map<String, LocalResource> tezJarResources =
         setupTezJarsLocalResources();
     localResources.putAll(tezJarResources);
@@ -469,7 +476,7 @@ public class TezClient {
     for (Vertex v : dag.getVertices()) {
       v.getTaskLocalResources().putAll(tezJarResources);
     }
-    
+
     // emit protobuf DAG file style
     Path binaryPath =  new Path(appStagingDir,
         TezConfiguration.TEZ_AM_PLAN_PB_BINARY + "." + appId.toString());
@@ -481,7 +488,7 @@ public class TezClient {
     DAGPlan dagPB = dag.createDag(finalAMConf);
 
     FSDataOutputStream dagPBOutBinaryStream = null;
-    
+
     try {
       //binary output
       dagPBOutBinaryStream = FileSystem.create(fs, binaryPath,
@@ -502,7 +509,7 @@ public class TezClient {
       localResources.put(TezConfiguration.TEZ_AM_PLAN_PB_TEXT,
           createApplicationResource(fs, textPath, LocalResourceType.FILE));
     }
-    
+
     Map<ApplicationAccessType, String> acls
         = new HashMap<ApplicationAccessType, String>();
 
@@ -567,7 +574,7 @@ public class TezClient {
     idFormat.setGroupingUsed(false);
     idFormat.setMinimumIntegerDigits(6);
   }
-  
+
   String getDefaultTezDAGID(ApplicationId appId) {
      return (new StringBuilder(DAG)).append(SEPARATOR).
                    append(appId.getClusterTimestamp()).

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b239ebd7/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
index c28f2bf..1dbfd96 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java
@@ -37,15 +37,15 @@ public class DAGStatus {
     FAILED,
     ERROR,
   };
-  
+
   DAGStatusProtoOrBuilder proxy = null;
   Progress progress = null;
   Map<String, Progress> vertexProgress = null;
-  
+
   public DAGStatus(DAGStatusProtoOrBuilder proxy) {
     this.proxy = proxy;
   }
-  
+
   public State getState() {
     switch(proxy.getState()) {
     case DAG_SUBMITTED:
@@ -64,11 +64,11 @@ public class DAGStatus {
     case DAG_ERROR:
       return DAGStatus.State.ERROR;
     default:
-      throw new TezUncheckedException("Unsupported value for DAGStatus.State : " + 
+      throw new TezUncheckedException("Unsupported value for DAGStatus.State : " +
                               proxy.getState());
     }
   }
-  
+
   public boolean isCompleted() {
     State state = getState();
     return (state == State.SUCCEEDED ||
@@ -83,7 +83,7 @@ public class DAGStatus {
 
   /**
    * Gets overall progress value of the DAG.
-   * 
+   *
    * @return Progress of the DAG. Maybe null when the DAG is not running. Maybe
    *         null when the DAG is running and the application master cannot be
    *         reached - e.g. when the execution platform has restarted the
@@ -99,7 +99,7 @@ public class DAGStatus {
 
   /**
    * Get the progress of a vertex in the DAG
-   * 
+   *
    * @return Progress of the vertex. May be null when the DAG is not running.
    *         Maybe null when the DAG is running and the application master
    *         cannot be reached - e.g. when the execution platform has restarted
@@ -110,7 +110,7 @@ public class DAGStatus {
     if(vertexProgress == null) {
       if(proxy.getVertexProgressList() != null) {
         List<StringProgressPairProto> kvList = proxy.getVertexProgressList();
-        vertexProgress = new HashMap<String, Progress>(kvList.size());        
+        vertexProgress = new HashMap<String, Progress>(kvList.size());
         for(StringProgressPairProto kv : kvList){
           vertexProgress.put(kv.getKey(), new Progress(kv.getProgress()));
         }
@@ -119,4 +119,12 @@ public class DAGStatus {
     return vertexProgress;
   }
 
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("status=" + getState()
+        + ", progress=" + getDAGProgress());
+    return sb.toString();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b239ebd7/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 627d583..1cb4cd1 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -22,16 +22,21 @@ import java.io.IOException;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -43,11 +48,42 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
+import org.apache.tez.dag.api.EdgeProperty.SourceType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Dummy class for testing MR framefork. Sleeps for a defined period
@@ -359,11 +395,207 @@ public class MRRSleepJob extends Configured implements Tool {
     System.exit(res);
   }
 
+  public DAG createDAG(FileSystem remoteFs, Configuration conf,
+      ApplicationId appId, Path remoteStagingDir,
+      int numMapper, int numReducer, int iReduceStagesCount,
+      int numIReducer, long mapSleepTime, int mapSleepCount,
+      long reduceSleepTime, int reduceSleepCount,
+      long iReduceSleepTime, int iReduceSleepCount)
+      throws IOException, YarnException {
+
+
+    Configuration mapStageConf = new JobConf(conf);
+    mapStageConf.setInt(MRJobConfig.NUM_MAPS, numMapper);
+    mapStageConf.setLong(MAP_SLEEP_TIME, mapSleepTime);
+    mapStageConf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
+    mapStageConf.setLong(IREDUCE_SLEEP_TIME, iReduceSleepTime);
+    mapStageConf.setInt(MAP_SLEEP_COUNT, mapSleepCount);
+    mapStageConf.setInt(REDUCE_SLEEP_COUNT, reduceSleepCount);
+    mapStageConf.setInt(IREDUCE_SLEEP_COUNT, iReduceSleepCount);
+    mapStageConf.setInt(IREDUCE_STAGES_COUNT, iReduceStagesCount);
+    mapStageConf.setInt(IREDUCE_TASKS_COUNT, numIReducer);
+    mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
+    mapStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    mapStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+        SleepInputFormat.class.getName());
+    mapStageConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+        MRRSleepJobPartitioner.class.getName());
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(mapStageConf,
+        null);
+
+    Configuration[] intermediateReduceStageConfs = null;
+    if (iReduceStagesCount > 0
+        && numIReducer > 0) {
+      intermediateReduceStageConfs = new JobConf[iReduceStagesCount];
+      for (int i = 1; i <= iReduceStagesCount; ++i) {
+        JobConf iReduceStageConf = new JobConf(conf);
+        iReduceStageConf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, iReduceSleepTime);
+        iReduceStageConf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, iReduceSleepCount);
+        iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, numIReducer);
+        iReduceStageConf
+            .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
+        iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+            IntWritable.class.getName());
+        iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+            IntWritable.class.getName());
+        iReduceStageConf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+            MRRSleepJobPartitioner.class.getName());
+
+        if (i == 1) {
+          MultiStageMRConfToTezTranslator.translateVertexConfToTez(
+              iReduceStageConf, mapStageConf);
+        }
+        else {
+          MultiStageMRConfToTezTranslator.translateVertexConfToTez(
+              iReduceStageConf, intermediateReduceStageConfs[i-2]);
+        }
+        intermediateReduceStageConfs[i-1] = iReduceStageConf;
+      }
+    }
+
+    Configuration finalReduceConf = null;
+    if (numReducer > 0) {
+      finalReduceConf = new JobConf(conf);
+      finalReduceConf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, reduceSleepTime);
+      finalReduceConf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, reduceSleepCount);
+      finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, numReducer);
+      finalReduceConf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
+      finalReduceConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+          IntWritable.class.getName());
+      finalReduceConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+          IntWritable.class.getName());
+      finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+          NullOutputFormat.class.getName());
+
+      if (iReduceStagesCount != 0) {
+        MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
+            intermediateReduceStageConfs[iReduceStagesCount-1]);
+      } else {
+        MultiStageMRConfToTezTranslator.translateVertexConfToTez(finalReduceConf,
+            mapStageConf);
+      }
+    }
+
+    MRHelpers.doJobClientMagic(mapStageConf);
+    if (iReduceStagesCount > 0
+        && numIReducer > 0) {
+      for (int i = 0; i < iReduceStagesCount; ++i) {
+        MRHelpers.doJobClientMagic(intermediateReduceStageConfs[i]);
+      }
+    }
+    if (numReducer > 0) {
+      MRHelpers.doJobClientMagic(finalReduceConf);
+    }
+
+    InputSplitInfo inputSplitInfo;
+    try {
+      inputSplitInfo = MRHelpers.generateInputSplits(mapStageConf,
+          remoteStagingDir);
+    } catch (InterruptedException e) {
+      // TODO Auto-generated catch block
+      throw new TezUncheckedException("Could not generate input splits", e);
+    } catch (ClassNotFoundException e) {
+      throw new TezUncheckedException("Failed to generate input splits", e);
+    }
+
+    DAG dag = new DAG("MRRSleepJob");
+    String jarPath = ClassUtil.findContainingJar(getClass());
+    Path remoteJarPath = remoteFs.makeQualified(
+        new Path(remoteStagingDir, "dag_job.jar"));
+    remoteFs.copyFromLocalFile(new Path(jarPath), remoteJarPath);
+    FileStatus jarFileStatus = remoteFs.getFileStatus(remoteJarPath);
+
+    Map<String, LocalResource> commonLocalResources =
+        new HashMap<String, LocalResource>();
+    LocalResource dagJarLocalRsrc = LocalResource.newInstance(
+        ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+        LocalResourceType.FILE,
+        LocalResourceVisibility.APPLICATION,
+        jarFileStatus.getLen(),
+        jarFileStatus.getModificationTime());
+    commonLocalResources.put("dag_job.jar", dagJarLocalRsrc);
+
+    List<Vertex> vertices = new ArrayList<Vertex>();
+
+    Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
+        MapProcessor.class.getName(),
+        MRHelpers.createByteBufferFromConf(mapStageConf)),
+        numMapper,
+        MRHelpers.getMapResource(mapStageConf));
+    mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(mapStageConf));
+    mapVertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+    Map<String, LocalResource> mapLocalResources =
+        new HashMap<String, LocalResource>();
+    mapLocalResources.putAll(commonLocalResources);
+    MRHelpers.updateLocalResourcesForInputSplits(remoteFs, inputSplitInfo,
+        mapLocalResources);
+    mapVertex.setTaskLocalResources(mapLocalResources);
+    Map<String, String> mapEnv = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(mapStageConf, mapEnv, true);
+    mapVertex.setTaskEnvironment(mapEnv);
+    vertices.add(mapVertex);
+
+    if (iReduceStagesCount > 0
+        && numIReducer > 0) {
+      for (int i = 0; i < iReduceStagesCount; ++i) {
+        Configuration iconf =
+            intermediateReduceStageConfs[i];
+        Vertex ivertex = new Vertex("ireduce" + (i+1),
+            new ProcessorDescriptor(ReduceProcessor.class.getName(),
+                MRHelpers.createByteBufferFromConf(iconf)),
+                numIReducer,
+                MRHelpers.getReduceResource(iconf));
+        ivertex.setJavaOpts(MRHelpers.getReduceJavaOpts(iconf));
+        ivertex.setTaskLocalResources(commonLocalResources);
+        Map<String, String> reduceEnv = new HashMap<String, String>();
+        MRHelpers.updateEnvironmentForMRTasks(iconf, reduceEnv, false);
+        ivertex.setTaskEnvironment(reduceEnv);
+        vertices.add(ivertex);
+      }
+    }
+
+    Vertex finalReduceVertex = null;
+    if (numReducer > 0) {
+      finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
+          ReduceProcessor.class.getName(),
+          MRHelpers.createByteBufferFromConf(finalReduceConf)),
+          numReducer,
+          MRHelpers.getReduceResource(finalReduceConf));
+      finalReduceVertex.setJavaOpts(
+          MRHelpers.getReduceJavaOpts(finalReduceConf));
+      finalReduceVertex.setTaskLocalResources(commonLocalResources);
+      Map<String, String> reduceEnv = new HashMap<String, String>();
+      MRHelpers.updateEnvironmentForMRTasks(finalReduceConf, reduceEnv, false);
+      finalReduceVertex.setTaskEnvironment(reduceEnv);
+      vertices.add(finalReduceVertex);
+    }
+
+    for (int i = 0; i < vertices.size(); ++i) {
+      dag.addVertex(vertices.get(i));
+      if (i != 0) {
+        dag.addEdge(new Edge(vertices.get(i-1),
+            vertices.get(i), new EdgeProperty(
+                ConnectionPattern.BIPARTITE, SourceType.STABLE,
+                new OutputDescriptor(
+                    OnFileSortedOutput.class.getName(), null),
+                new InputDescriptor(
+                    ShuffledMergedInput.class.getName(), null))));
+      }
+    }
+
+    return dag;
+  }
+
+  @VisibleForTesting
   public Job createJob(int numMapper, int numReducer, int iReduceStagesCount,
       int numIReducer, long mapSleepTime, int mapSleepCount,
       long reduceSleepTime, int reduceSleepCount,
       long iReduceSleepTime, int iReduceSleepCount)
-      throws IOException {
+          throws IOException {
     Configuration conf = getConf();
     conf.setLong(MAP_SLEEP_TIME, mapSleepTime);
     conf.setLong(REDUCE_SLEEP_TIME, reduceSleepTime);
@@ -385,20 +617,19 @@ public class MRRSleepJob extends Configured implements Tool {
       // Set reducer class for intermediate reduce
       conf.setClass(
           MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
-          "mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
+              "mapreduce.job.reduce.class"), ISleepReducer.class, Reducer.class);
       // Set reducer output key class
       conf.setClass(
           MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
-          "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
+              "mapreduce.map.output.key.class"), IntWritable.class, Object.class);
       // Set reducer output value class
       conf.setClass(
           MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
-          "mapreduce.map.output.value.class"), IntWritable.class, Object.class);
+              "mapreduce.map.output.value.class"), IntWritable.class, Object.class);
       conf.setInt(
           MultiStageMRConfigUtil.getPropertyNameForIntermediateStage(i,
-          "mapreduce.job.reduces"), numIReducer);
+              "mapreduce.job.reduces"), numIReducer);
     }
-
     Job job = Job.getInstance(conf, "sleep");
     job.setNumReduceTasks(numReducer);
     job.setJarByClass(MRRSleepJob.class);
@@ -413,7 +644,6 @@ public class MRRSleepJob extends Configured implements Tool {
     job.setSpeculativeExecution(false);
     job.setJobName("Sleep job");
 
-
     FileInputFormat.addInputPath(job, new Path("ignored"));
     return job;
   }
@@ -468,10 +698,45 @@ public class MRRSleepJob extends Configured implements Tool {
     mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
     reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
     iReduceSleepCount = (int)Math.ceil(iReduceSleepTime / ((double)recSleepTime));
-    Job job = createJob(numMapper, numReducer, iReduceStagesCount, numIReducer,
+
+    TezConfiguration conf = new TezConfiguration();
+    FileSystem remoteFs = FileSystem.get(conf);
+
+    TezClient tezClient = new TezClient(conf);
+    ApplicationId appId =
+        tezClient.createApplication();
+
+    Path remoteStagingDir =
+        remoteFs.makeQualified(new Path(conf.get(
+            TezConfiguration.TEZ_AM_STAGING_DIR,
+            TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT),
+            appId.toString()));
+    tezClient.ensureExists(remoteStagingDir);
+
+    DAG dag = createDAG(remoteFs, conf, appId, remoteStagingDir,
+        numMapper, numReducer, iReduceStagesCount, numIReducer,
         mapSleepTime, mapSleepCount, reduceSleepTime, reduceSleepCount,
         iReduceSleepTime, iReduceSleepCount);
-    return job.waitForCompletion(true) ? 0 : 1;
+
+    DAGClient dagClient =
+        tezClient.submitDAGApplication(appId, dag, remoteStagingDir,
+            null, null, null, null, null, conf);
+
+    while (true) {
+      DAGStatus status = dagClient.getDAGStatus();
+      LOG.info("DAG Status: " + status);
+      if (status.isCompleted()) {
+        break;
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // do nothing
+      }
+    }
+
+    return dagClient.getApplicationReport().getFinalApplicationStatus() ==
+        FinalApplicationStatus.SUCCEEDED ? 0 : 1;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b239ebd7/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
index 3bf9958..0e767b4 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
@@ -20,6 +20,8 @@ package org.apache.tez.mapreduce;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +46,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
 public class ResourceMgrDelegate {
@@ -81,7 +84,10 @@ public class ResourceMgrDelegate {
 
   public JobStatus[] getAllJobs() throws IOException, InterruptedException {
     try {
-      return TypeConverter.fromYarnApps(client.getApplicationList(), this.conf);
+      Set<String> appTypes = new HashSet<String>(1);
+      appTypes.add(TezConfiguration.TEZ_APPLICATION_TYPE);
+      return TypeConverter.fromYarnApps(client.getApplications(appTypes),
+          this.conf);
     } catch (YarnException e) {
       throw new IOException(e);
     }