You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/26 02:14:05 UTC

[2/3] TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move YARNRunner to tez-mapreduce project (bikas)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
new file mode 100644
index 0000000..d3b2c08
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
@@ -0,0 +1,232 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class ResourceMgrDelegate {
+  private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+      
+  private YarnConfiguration conf;
+  private GetNewApplicationResponse application;
+  private ApplicationId applicationId;
+  private YarnClient client;
+  private InetSocketAddress rmAddress;
+
+  /**
+   * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
+   * @param conf the configuration object.
+   */
+  public ResourceMgrDelegate(YarnConfiguration conf) {
+    super();
+    this.conf = conf;
+    client = YarnClient.createYarnClient();
+    client.init(conf);
+    this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_PORT);
+    client.start();
+  }
+
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    try {
+      return TypeConverter.fromYarnNodes(client.getNodeReports());
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    try {
+      Set<String> appTypes = new HashSet<String>(1);
+      appTypes.add(TezConfiguration.TEZ_APPLICATION_TYPE);
+      return TypeConverter.fromYarnApps(client.getApplications(appTypes),
+          this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    // TODO: Implement getBlacklistedTrackers
+    LOG.warn("getBlacklistedTrackers - Not implemented yet");
+    return new TaskTrackerInfo[0];
+  }
+
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    YarnClusterMetrics metrics;
+    try {
+      metrics = client.getYarnClusterMetrics();
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+    ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
+        metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
+        metrics.getNumNodeManagers(), 0, 0);
+    return oldMetrics;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Token getDelegationToken(Text renewer) throws IOException,
+      InterruptedException {
+    try {
+      // Remove rmAddress after YARN-868 is addressed
+      return ConverterUtils.convertFromYarn(
+        client.getRMDelegationToken(renewer), rmAddress);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return FileSystem.get(conf).getUri().toString();
+  }
+
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    try {
+      this.application = 
+          client.createApplication().getNewApplicationResponse();
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+    this.applicationId = this.application.getApplicationId();
+    return TypeConverter.fromYarn(applicationId);
+  }
+
+  public QueueInfo getQueue(String queueName) throws IOException,
+  InterruptedException {
+    try {
+      return TypeConverter.fromYarn(
+          client.getQueueInfo(queueName), this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    try {
+      return TypeConverter.fromYarnQueueUserAclsInfo(
+          client.getQueueAclsInfo());
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    try {
+      return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    try {
+      return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
+          this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public QueueInfo[] getChildQueues(String parent) throws IOException,
+      InterruptedException {
+    try {
+      return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
+        this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+//    Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+    String user = 
+      UserGroupInformation.getCurrentUser().getShortUserName();
+    Path path = MRApps.getStagingAreaDir(conf, user);
+    LOG.debug("getStagingAreaDir: dir=" + path);
+    return path.toString();
+  }
+
+
+  public String getSystemDir() throws IOException, InterruptedException {
+    Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+    //FileContext.getFileContext(conf).delete(sysDir, true);
+    return sysDir.toString();
+  }
+  
+
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return 0;
+  }
+  
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    return;
+  }
+
+
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return 0;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public void killApplication(ApplicationId appId)
+      throws YarnException, IOException {
+    client.killApplication(appId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
new file mode 100644
index 0000000..6b6dd35
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -0,0 +1,722 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN-TEZ.
+ */
+@SuppressWarnings({ "unchecked" })
+public class YARNRunner implements ClientProtocol {
+
+  private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+  private ResourceMgrDelegate resMgrDelegate;
+  private ClientCache clientCache;
+  private Configuration conf;
+  private final FileContext defaultFileContext;
+
+  final public static FsPermission DAG_FILE_PERMISSION =
+      FsPermission.createImmutable((short) 0644);
+  final public static int UTF8_CHUNK_SIZE = 16 * 1024;
+
+  private final TezConfiguration tezConf;
+  private final TezClient tezClient;
+  private DAGClient dagClient;
+
+  /**
+   * Yarn runner incapsulates the client interface of
+   * yarn
+   * @param conf the configuration object for the client
+   */
+  public YARNRunner(Configuration conf) {
+   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
+  }
+
+  /**
+   * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
+   * {@link ResourceMgrDelegate}. Enables mocking and testing.
+   * @param conf the configuration object for the client
+   * @param resMgrDelegate the resourcemanager client handle.
+   */
+  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+  }
+
+  /**
+   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+   * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+   * @param conf the configuration object
+   * @param resMgrDelegate the resource manager delegate
+   * @param clientCache the client cache object.
+   */
+  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+      ClientCache clientCache) {
+    this.conf = conf;
+    this.tezConf = new TezConfiguration(conf);
+    try {
+      this.resMgrDelegate = resMgrDelegate;
+      this.tezClient = new TezClient(tezConf);
+      this.clientCache = clientCache;
+      this.defaultFileContext = FileContext.getFileContext(this.conf);
+
+    } catch (UnsupportedFileSystemException ufe) {
+      throw new RuntimeException("Error in instantiating YarnClient", ufe);
+    }
+  }
+
+  @VisibleForTesting
+  @Private
+  /**
+   * Used for testing mostly.
+   * @param resMgrDelegate the resource manager delegate to set to.
+   */
+  public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
+    this.resMgrDelegate = resMgrDelegate;
+  }
+
+  @Override
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Use Token.renew instead");
+  }
+
+  @Override
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getActiveTrackers();
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    return resMgrDelegate.getAllJobs();
+  }
+
+  @Override
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getBlacklistedTrackers();
+  }
+
+  @Override
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getClusterMetrics();
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+      throws IOException, InterruptedException {
+    // The token is only used for serialization. So the type information
+    // mismatch should be fine.
+    return resMgrDelegate.getDelegationToken(renewer);
+  }
+
+  @Override
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return resMgrDelegate.getFilesystemName();
+  }
+
+  @Override
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    return resMgrDelegate.getNewJobID();
+  }
+
+  @Override
+  public QueueInfo getQueue(String queueName) throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getQueue(queueName);
+  }
+
+  @Override
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getQueueAclsForCurrentUser();
+  }
+
+  @Override
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    return resMgrDelegate.getQueues();
+  }
+
+  @Override
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    return resMgrDelegate.getRootQueues();
+  }
+
+  @Override
+  public QueueInfo[] getChildQueues(String parent) throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getChildQueues(parent);
+  }
+
+  @Override
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+    return resMgrDelegate.getStagingAreaDir();
+  }
+
+  @Override
+  public String getSystemDir() throws IOException, InterruptedException {
+    return resMgrDelegate.getSystemDir();
+  }
+
+  @Override
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getTaskTrackerExpiryInterval();
+  }
+
+  private Map<String, LocalResource> createJobLocalResources(
+      Configuration jobConf, String jobSubmitDir)
+      throws IOException {
+
+    // Setup LocalResources
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+
+    Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+    URL yarnUrlForJobSubmitDir = ConverterUtils
+        .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
+            .resolvePath(
+                defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+    LOG.debug("Creating setup context, jobSubmitDir url is "
+        + yarnUrlForJobSubmitDir);
+
+    localResources.put(MRJobConfig.JOB_CONF_FILE,
+        createApplicationResource(defaultFileContext,
+            jobConfPath, LocalResourceType.FILE));
+    if (jobConf.get(MRJobConfig.JAR) != null) {
+      Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+      LocalResource rc = createApplicationResource(defaultFileContext,
+          jobJarPath,
+          LocalResourceType.FILE);
+      // FIXME fix pattern support
+      // String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+      // JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+      // rc.setPattern(pattern);
+      localResources.put(MRJobConfig.JOB_JAR, rc);
+    } else {
+      // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+      // mapreduce jar itself which is already on the classpath.
+      LOG.info("Job jar is not present. "
+          + "Not adding any jar to the list of resources.");
+    }
+
+    // TODO gross hack
+    for (String s : new String[] {
+        MRJobConfig.JOB_SPLIT,
+        MRJobConfig.JOB_SPLIT_METAINFO}) {
+      localResources.put(s,
+          createApplicationResource(defaultFileContext,
+              new Path(jobSubmitDir, s), LocalResourceType.FILE));
+    }
+
+    MRApps.setupDistributedCache(jobConf, localResources);
+
+    return localResources;
+  }
+
+  // FIXME isn't this a nice mess of a client?
+  // read input, write splits, read splits again
+  private List<TaskLocationHint> getMapLocationHintsFromInputSplits(JobID jobId,
+      FileSystem fs, Configuration conf,
+      String jobSubmitDir) throws IOException {
+    TaskSplitMetaInfo[] splitsInfo =
+        SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
+            new Path(jobSubmitDir));
+    int splitsCount = splitsInfo.length;
+    List<TaskLocationHint> locationHints =
+        new ArrayList<TaskLocationHint>(splitsCount);
+    for (int i = 0; i < splitsCount; ++i) {
+      TaskLocationHint locationHint =
+          new TaskLocationHint(
+              new HashSet<String>(
+                  Arrays.asList(splitsInfo[i].getLocations())), null);
+      locationHints.add(locationHint);
+    }
+    return locationHints;
+  }
+
+  private void setupMapReduceEnv(Configuration jobConf,
+      Map<String, String> environment, boolean isMap) throws IOException {
+
+    if (isMap) {
+      warnForJavaLibPath(
+          jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
+          "map",
+          MRJobConfig.MAP_JAVA_OPTS,
+          MRJobConfig.MAP_ENV);
+      warnForJavaLibPath(
+          jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
+          "map",
+          MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+          MRJobConfig.MAPRED_ADMIN_USER_ENV);
+    } else {
+      warnForJavaLibPath(
+          jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
+          "reduce",
+          MRJobConfig.REDUCE_JAVA_OPTS,
+          MRJobConfig.REDUCE_ENV);
+      warnForJavaLibPath(
+          jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
+          "reduce",
+          MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+          MRJobConfig.MAPRED_ADMIN_USER_ENV);
+    }
+
+    MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
+  }
+
+  private Vertex createVertexForStage(Configuration stageConf,
+      Map<String, LocalResource> jobLocalResources,
+      List<TaskLocationHint> locations, int stageNum, int totalStages)
+      throws IOException {
+    // stageNum starts from 0, goes till numStages - 1
+    boolean isMap = false;
+    if (stageNum == 0) {
+      isMap = true;
+    }
+
+    int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
+        : stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    String processorName = isMap ? MapProcessor.class.getName()
+        : ReduceProcessor.class.getName();
+    String vertexName = null;
+    if (isMap) {
+      vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+    } else {
+      if (stageNum == totalStages - 1) {
+        vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
+      } else {
+        vertexName = MultiStageMRConfigUtil
+            .getIntermediateStageVertexName(stageNum);
+      }
+    }
+
+    Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
+        : MRHelpers.getReduceResource(stageConf);
+    byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
+    Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
+        setUserPayload(vertexUserPayload),
+        numTasks, taskResource);
+    if (isMap) {
+      byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
+      MRHelpers.addMRInput(vertex, mapInputPayload, null);
+    }
+    // Map only jobs.
+    if (stageNum == totalStages -1) {
+      MRHelpers.addMROutput(vertex, vertexUserPayload);
+    }
+
+    Map<String, String> taskEnv = new HashMap<String, String>();
+    setupMapReduceEnv(stageConf, taskEnv, isMap);
+
+    Map<String, LocalResource> taskLocalResources =
+        new TreeMap<String, LocalResource>();
+    // PRECOMMIT Remove split localization for reduce tasks if it's being set
+    // here
+    taskLocalResources.putAll(jobLocalResources);
+
+    String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
+        : MRHelpers.getReduceJavaOpts(stageConf);
+
+    vertex.setTaskEnvironment(taskEnv)
+        .setTaskLocalResources(taskLocalResources)
+        .setTaskLocationsHint(locations)
+        .setJavaOpts(taskJavaOpts);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding vertex to DAG" + ", vertexName="
+          + vertex.getVertexName() + ", processor="
+          + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
+          + vertex.getParallelism() + ", javaOpts=" + vertex.getJavaOpts()
+          + ", resources=" + vertex.getTaskResource()
+      // TODO Add localResources and Environment
+      );
+    }
+
+    return vertex;
+  }
+
+  private DAG createDAG(FileSystem fs, JobID jobId, Configuration[] stageConfs,
+      String jobSubmitDir, Credentials ts,
+      Map<String, LocalResource> jobLocalResources) throws IOException {
+
+    String jobName = stageConfs[0].get(MRJobConfig.JOB_NAME,
+        YarnConfiguration.DEFAULT_APPLICATION_NAME);
+    DAG dag = new DAG(jobName);
+
+    LOG.info("Number of stages: " + stageConfs.length);
+
+    List<TaskLocationHint> mapInputLocations =
+        getMapLocationHintsFromInputSplits(
+            jobId, fs, stageConfs[0], jobSubmitDir);
+    List<TaskLocationHint> reduceInputLocations = null;
+
+    Vertex[] vertices = new Vertex[stageConfs.length];
+    for (int i = 0; i < stageConfs.length; i++) {
+      vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
+          i == 0 ? mapInputLocations : reduceInputLocations, i,
+          stageConfs.length);
+    }
+
+    for (int i = 0; i < vertices.length; i++) {
+      dag.addVertex(vertices[i]);
+      if (i > 0) {
+        EdgeProperty edgeProperty = new EdgeProperty(
+            DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(OnFileSortedOutput.class.getName()),
+            new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
+
+        Edge edge = null;
+        edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);
+        dag.addEdge(edge);
+      }
+
+    }
+    return dag;
+  }
+
+  private TezConfiguration getDAGAMConfFromMRConf() {
+    TezConfiguration finalConf = new TezConfiguration(this.tezConf);
+    Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
+        .getMRToDAGParamMap();
+
+    for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+      if (finalConf.get(entry.getKey()) != null) {
+        finalConf.set(entry.getValue(), finalConf.get(entry.getKey()));
+        finalConf.unset(entry.getKey());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
+              + " to Tez key: " + entry.getValue() + " with value "
+              + finalConf.get(entry.getValue()));
+        }
+      }
+    }
+    return finalConf;
+  }
+
+  @Override
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+  throws IOException, InterruptedException {
+
+    ApplicationId appId = resMgrDelegate.getApplicationId();
+
+    FileSystem fs = FileSystem.get(conf);
+    // Loads the job.xml written by the user.
+    JobConf jobConf = new JobConf(new TezConfiguration(conf));
+
+    // Extract individual raw MR configs.
+    Configuration[] stageConfs = MultiStageMRConfToTezTranslator
+        .getStageConfs(jobConf);
+
+    // Transform all confs to use Tez keys
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
+        null);
+    for (int i = 1; i < stageConfs.length; i++) {
+      MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i],
+          stageConfs[i - 1]);
+    }
+
+    // create inputs to tezClient.submit()
+
+    // FIXME set up job resources
+    Map<String, LocalResource> jobLocalResources =
+        createJobLocalResources(stageConfs[0], jobSubmitDir);
+
+    // FIXME createDAG should take the tezConf as a parameter, instead of using
+    // MR keys.
+    DAG dag = createDAG(fs, jobId, stageConfs, jobSubmitDir, ts,
+        jobLocalResources);
+
+    List<String> vargs = new LinkedList<String>();
+    // admin command opts and user command opts
+    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
+        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
+    vargs.add(mrAppMasterAdminOptions);
+
+    // Add AM user command opts
+    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+    warnForJavaLibPath(mrAppMasterUserOptions, "app master",
+        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
+    vargs.add(mrAppMasterUserOptions);
+
+    StringBuilder javaOpts = new StringBuilder();
+    for (String varg : vargs) {
+      javaOpts.append(varg).append(" ");
+    }
+
+    // Setup the CLASSPATH in environment
+    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+    Map<String, String> environment = new HashMap<String, String>();
+
+    // Setup the environment variables for AM
+    MRHelpers.updateEnvironmentForMRAM(conf, environment);
+
+    TezConfiguration dagAMConf = getDAGAMConfFromMRConf();
+    dagAMConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, javaOpts.toString());
+
+    // Submit to ResourceManager
+    try {
+      dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+          jobSubmitDir);
+      AMConfiguration amConfig = new AMConfiguration(
+          jobConf.get(JobContext.QUEUE_NAME,
+              YarnConfiguration.DEFAULT_QUEUE_NAME),
+          environment,
+          jobLocalResources, dagAMConf, ts);
+      tezClient.submitDAGApplication(appId, dag, amConfig);
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
+
+    return getJobStatus(jobId);
+  }
+
+  private LocalResource createApplicationResource(FileContext fs, Path p,
+      LocalResourceType type) throws IOException {
+    LocalResource rsrc = Records.newRecord(LocalResource.class);
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+    rsrc.setSize(rsrcStat.getLen());
+    rsrc.setTimestamp(rsrcStat.getModificationTime());
+    rsrc.setType(type);
+    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    return rsrc;
+  }
+
+  @Override
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    resMgrDelegate.setJobPriority(arg0, arg1);
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return resMgrDelegate.getProtocolVersion(arg0, arg1);
+  }
+
+  @Override
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Use Token.renew instead");
+  }
+
+
+  @Override
+  public Counters getJobCounters(JobID arg0) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0).getJobCounters(arg0);
+  }
+
+  @Override
+  public String getJobHistoryDir() throws IOException, InterruptedException {
+    return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+  }
+
+  @Override
+  public JobStatus getJobStatus(JobID jobID) throws IOException,
+      InterruptedException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    String jobFile = MRApps.getJobFile(conf, user, jobID);
+    DAGStatus dagStatus;
+    try {
+      if(dagClient == null) {
+        dagClient = tezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId());
+      }
+      dagStatus = dagClient.getDAGStatus();
+      return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+      int arg2) throws IOException, InterruptedException {
+    return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
+  }
+
+  @Override
+  public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
+  }
+
+  @Override
+  public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+  throws IOException, InterruptedException {
+    return clientCache.getClient(jobID)
+        .getTaskReports(jobID, taskType);
+  }
+
+  @Override
+  public void killJob(JobID arg0) throws IOException, InterruptedException {
+    /* check if the status is not running, if not send kill to RM */
+    JobStatus status = getJobStatus(arg0);
+    if (status.getState() == JobStatus.State.RUNNING ||
+        status.getState() == JobStatus.State.PREP) {
+      try {
+        resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+      } catch (YarnException e) {
+        throw new IOException(e);
+      }
+      return;
+    }
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
+  }
+
+  @Override
+  public AccessControlList getQueueAdmins(String arg0) throws IOException {
+    return new AccessControlList("*");
+  }
+
+  @Override
+  public JobTrackerStatus getJobTrackerStatus() throws IOException,
+      InterruptedException {
+    return JobTrackerStatus.RUNNING;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
+        clientMethodsHash);
+  }
+
+  @Override
+  public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+      throws IOException {
+    try {
+      return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private static void warnForJavaLibPath(String opts, String component,
+      String javaConf, String envConf) {
+    if (opts != null && opts.contains("-Djava.library.path")) {
+      LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
+               "programs to no longer function if hadoop native libraries " +
+               "are used. These values should be set as part of the " +
+               "LD_LIBRARY_PATH in the " + component + " JVM env using " +
+               envConf + " config settings.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
new file mode 100644
index 0000000..c36dc9d
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+
+public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
+
+  @Override
+  public ClientProtocol create(Configuration conf) throws IOException {
+    if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+      return new YARNRunner(conf);
+    }
+    return null;
+  }
+
+  @Override
+  public ClientProtocol create(InetSocketAddress addr, Configuration conf)
+      throws IOException {
+    return create(conf);
+  }
+
+  @Override
+  public void close(ClientProtocol clientProtocol) throws IOException {
+    // nothing to do
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..88816ca
--- /dev/null
+++ b/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.tez.mapreduce.YarnTezClientProtocolProvider

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml
new file mode 100644
index 0000000..b3c247a
--- /dev/null
+++ b/tez-tests/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-tests</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce-examples</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
new file mode 100644
index 0000000..f98f392
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -0,0 +1,359 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.RandomTextWriterJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobs {
+
+  private static final Log LOG = LogFactory.getLog(TestMRRJobs.class);
+
+  protected static MiniTezCluster mrrTezCluster;
+  protected static MiniDFSCluster dfsCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+
+  private static String TEST_ROOT_DIR = "target"
+      + Path.SEPARATOR + TestMRRJobs.class.getName() + "-tmpDir";
+
+  private static final String OUTPUT_ROOT_DIR = "/tmp" + Path.SEPARATOR +
+      TestMRRJobs.class.getSimpleName();
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    if (mrrTezCluster == null) {
+      mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1,
+          1, 1);
+      Configuration conf = new Configuration();
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
+      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+      mrrTezCluster.init(conf);
+      mrrTezCluster.start();
+    }
+
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
+  @Test (timeout = 60000)
+  public void testMRRSleepJob() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    LOG.info("\n\n\nStarting testMRRSleepJob().");
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO use dag client to test counters and task progress?
+    // what about completed jobs?
+  }
+
+  @Test (timeout = 60000)
+  public void testRandomWriter() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testRandomWriter().");
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
+    mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
+    mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
+    Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
+    Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
+    FileOutputFormat.setOutputPath(job, outputDir);
+    job.setSpeculativeExecution(false);
+    job.setJarByClass(RandomTextWriterJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+    // Make sure there are three files in the output-dir
+
+    RemoteIterator<FileStatus> iterator =
+        FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
+            outputDir);
+    int count = 0;
+    while (iterator.hasNext()) {
+      FileStatus file = iterator.next();
+      if (!file.getPath().getName()
+          .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+        count++;
+      }
+    }
+    Assert.assertEquals("Number of part files is wrong!", 3, count);
+
+  }
+
+
+  @Test (timeout = 60000)
+  public void testFailingJob() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testFailingJob().");
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.getConfiguration().setBoolean(MRRSleepJob.MAP_FATAL_ERROR, true);
+    job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "*");
+
+    job.submit();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertFalse(succeeded);
+    Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO verify failed task diagnostics
+  }
+
+  @Test (timeout = 60000)
+  public void testFailingAttempt() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testFailingAttempt().");
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(3); // speed up failures
+    job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
+    job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
+
+    job.submit();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO verify failed task diagnostics
+  }
+
+  @Test (timeout = 60000)
+  public void testMRRSleepJobWithCompression() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 2, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+
+    // enable compression
+    job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+    job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+        DefaultCodec.class.getName());
+
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO use dag client to test counters and task progress?
+    // what about completed jobs?
+
+  }
+
+
+  /*
+  //@Test (timeout = 60000)
+  public void testMRRSleepJobWithSecurityOn() throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testMRRSleepJobWithSecurityOn().");
+
+    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+      return;
+    }
+
+    mrrTezCluster.getConfig().set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    mrrTezCluster.getConfig().set(YarnConfiguration.RM_KEYTAB, "/etc/krb5.keytab");
+    mrrTezCluster.getConfig().set(YarnConfiguration.NM_KEYTAB, "/etc/krb5.keytab");
+    mrrTezCluster.getConfig().set(YarnConfiguration.RM_PRINCIPAL,
+        "rm/sightbusy-lx@LOCALHOST");
+    mrrTezCluster.getConfig().set(YarnConfiguration.NM_PRINCIPAL,
+        "nm/sightbusy-lx@LOCALHOST");
+
+    UserGroupInformation.setConfiguration(mrrTezCluster.getConfig());
+
+    // Keep it in here instead of after RM/NM as multiple user logins happen in
+    // the same JVM.
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+    LOG.info("User name is " + user.getUserName());
+    for (Token<? extends TokenIdentifier> str : user.getTokens()) {
+      LOG.info("Token is " + str.encodeToUrlString());
+    }
+    user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        MRRSleepJob sleepJob = new MRRSleepJob();
+        sleepJob.setConf(mrrTezCluster.getConfig());
+        Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
+        // //Job with reduces
+        // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
+        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+        job.submit();
+        String trackingUrl = job.getTrackingURL();
+        String jobId = job.getJobID().toString();
+        job.waitForCompletion(true);
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+        Assert.assertTrue("Tracking URL was " + trackingUrl +
+                          " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+        return null;
+      }
+    });
+
+    // TODO later:  add explicit "isUber()" checks of some sort
+  }
+  */
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
new file mode 100644
index 0000000..1bc7b4d
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobsDAGApi {
+
+  private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
+
+  protected static MiniTezCluster mrrTezCluster;
+  protected static MiniDFSCluster dfsCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestMRRJobsDAGApi.class.getName() + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+          .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    
+    if (mrrTezCluster == null) {
+      mrrTezCluster = new MiniTezCluster(TestMRRJobsDAGApi.class.getName(),
+          1, 1, 1);
+      Configuration conf = new Configuration();
+      conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      mrrTezCluster.init(conf);
+      mrrTezCluster.start();
+    }
+
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+    // TODO Add cleanup code.
+  }
+
+  // Submits a simple 5 stage sleep job using the DAG submit API instead of job
+  // client.
+  @Test(timeout = 60000)
+  public void testMRRSleepJobDagSubmit() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
+
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+
+  // Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
+  @Test(timeout = 60000)
+  public void testMRRSleepJobDagSubmitAndKill() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
+
+    Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+
+  // Submits a DAG to AM via RPC after AM has started
+  @Test(timeout = 60000)
+  public void testMRRSleepJobViaSession() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
+
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+  }
+
+  // Submits a DAG to AM via RPC after AM has started
+  @Test(timeout = 120000)
+  public void testMultipleMRRSleepJobViaSession() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    Map<String, String> commonEnv = createCommonEnv();
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+        .valueOf(new Random().nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    TezConfiguration tezConf = new TezConfiguration(
+        mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+
+    Map<String, LocalResource> amLocalResources =
+        new HashMap<String, LocalResource>();
+
+    AMConfiguration amConfig = new AMConfiguration(
+        "default", commonEnv, amLocalResources,
+        tezConf, null);
+    TezSessionConfiguration tezSessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    TezSession tezSession = new TezSession("testsession", tezSessionConfig);
+    tezSession.start();
+    Assert.assertEquals(TezSessionStatus.INITIALIZING,
+        tezSession.getSessionStatus());
+
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession, false);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertEquals(TezSessionStatus.READY,
+        tezSession.getSessionStatus());
+    finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession, false);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertEquals(TezSessionStatus.READY,
+        tezSession.getSessionStatus());
+
+    ApplicationId appId = tezSession.getApplicationId();
+    tezSession.stop();
+    Assert.assertEquals(TezSessionStatus.SHUTDOWN,
+        tezSession.getSessionStatus());
+
+    YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(mrrTezCluster.getConfig());
+    yarnClient.start();
+
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState().equals(
+          YarnApplicationState.FINISHED)
+          || appReport.getYarnApplicationState().equals(
+              YarnApplicationState.FAILED)
+          || appReport.getYarnApplicationState().equals(
+              YarnApplicationState.KILLED)) {
+        break;
+      }
+    }
+
+    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+    Assert.assertEquals(YarnApplicationState.FINISHED,
+        appReport.getYarnApplicationState());
+    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+        appReport.getFinalApplicationStatus());
+  }
+
+  // Submits a simple 5 stage sleep job using tez session. Then kills it.
+  @Test(timeout = 60000)
+  public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
+
+    Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+
+  // Create and close a tez session without submitting a job
+  @Test(timeout = 60000)
+  public void testTezSessionShutdown() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    testMRRSleepJobDagSubmitCore(true, false, true, false);
+  }
+
+  @Test(timeout = 60000)
+  public void testAMSplitGeneration() throws IOException, InterruptedException,
+      TezException, ClassNotFoundException, YarnException {
+    testMRRSleepJobDagSubmitCore(true, false, false, true);
+  }
+
+  public State testMRRSleepJobDagSubmitCore(
+      boolean dagViaRPC,
+      boolean killDagWhileRunning,
+      boolean closeSessionBeforeSubmit,
+      boolean genSplitsInAM) throws IOException,
+      InterruptedException, TezException, ClassNotFoundException,
+      YarnException {
+    return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
+        closeSessionBeforeSubmit, null, genSplitsInAM);
+  }
+
+  private Map<String, String> createCommonEnv() {
+    Map<String, String> commonEnv = new HashMap<String, String>();
+    return commonEnv;
+  }
+
+  public State testMRRSleepJobDagSubmitCore(
+      boolean dagViaRPC,
+      boolean killDagWhileRunning,
+      boolean closeSessionBeforeSubmit,
+      TezSession reUseTezSession,
+      boolean genSplitsInAM) throws IOException,
+      InterruptedException, TezException, ClassNotFoundException,
+      YarnException {
+    LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
+
+    JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
+    JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
+    JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
+
+    stage1Conf.setLong(MRRSleepJob.MAP_SLEEP_TIME, 1);
+    stage1Conf.setInt(MRRSleepJob.MAP_SLEEP_COUNT, 1);
+    stage1Conf.setInt(MRJobConfig.NUM_MAPS, 1);
+    stage1Conf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
+    stage1Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    stage1Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    stage1Conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+        SleepInputFormat.class.getName());
+    stage1Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+        MRRSleepJobPartitioner.class.getName());
+
+    stage2Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
+    stage2Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
+    stage2Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    stage2Conf
+        .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
+    stage2Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    stage2Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+        MRRSleepJobPartitioner.class.getName());
+
+    JobConf stage22Conf = new JobConf(stage2Conf);
+    stage22Conf.setInt(MRJobConfig.NUM_REDUCES, 2);
+
+    stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
+    stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
+    stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    stage3Conf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
+    stage3Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    stage3Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    stage3Conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+        NullOutputFormat.class.getName());
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf,
+        stage1Conf);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage22Conf,
+        stage1Conf);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage3Conf,
+        stage2Conf); // this also works stage22 as it sets up keys etc
+
+    MRHelpers.doJobClientMagic(stage1Conf);
+    MRHelpers.doJobClientMagic(stage2Conf);
+    MRHelpers.doJobClientMagic(stage22Conf);
+    MRHelpers.doJobClientMagic(stage3Conf);
+
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+        .valueOf(new Random().nextInt(100000))));
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+    InputSplitInfo inputSplitInfo = null;
+    if (!genSplitsInAM) {
+      inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
+        remoteStagingDir);
+    }
+
+    byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
+    byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
+    
+    DAG dag = new DAG("testMRRSleepJobDagSubmit");
+    int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
+    Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
+        : null;
+    Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
+        MapProcessor.class.getName()).setUserPayload(stage1Payload),
+        stage1NumTasks, Resource.newInstance(256, 1));
+    MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
+    Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
+        ReduceProcessor.class.getName()).setUserPayload(
+        MRHelpers.createUserPayloadFromConf(stage2Conf)),
+        1, Resource.newInstance(256, 1));
+    Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
+        ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
+        1, Resource.newInstance(256, 1));
+    MRHelpers.addMROutput(stage3Vertex, stage3Payload);
+
+    Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+    Map<String, String> commonEnv = createCommonEnv();
+
+    if (!genSplitsInAM) {
+      // TODO Use utility method post TEZ-205.
+      Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+      stage1LocalResources.put(
+          inputSplitInfo.getSplitsFile().getName(),
+          createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      stage1LocalResources.put(
+          inputSplitInfo.getSplitsMetaInfoFile().getName(),
+          createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      stage1LocalResources.putAll(commonLocalResources);
+
+      stage1Vertex.setTaskLocalResources(stage1LocalResources);
+      stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+    } else {
+      stage1Vertex.setTaskLocalResources(commonLocalResources);
+    }
+
+    stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
+    stage1Vertex.setTaskEnvironment(commonEnv);
+
+    // TODO env, resources
+
+    stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf));
+    stage2Vertex.setTaskLocalResources(commonLocalResources);
+    stage2Vertex.setTaskEnvironment(commonEnv);
+
+    stage3Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage3Conf));
+    stage3Vertex.setTaskLocalResources(commonLocalResources);
+    stage3Vertex.setTaskEnvironment(commonEnv);
+
+    dag.addVertex(stage1Vertex);
+    dag.addVertex(stage2Vertex);
+    dag.addVertex(stage3Vertex);
+
+    Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, new OutputDescriptor(
+        OnFileSortedOutput.class.getName()), new InputDescriptor(
+                ShuffledMergedInputLegacy.class.getName())));
+    Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, new OutputDescriptor(
+        OnFileSortedOutput.class.getName()), new InputDescriptor(
+                ShuffledMergedInputLegacy.class.getName())));
+
+    dag.addEdge(edge1);
+    dag.addEdge(edge2);
+
+    Map<String, LocalResource> amLocalResources =
+        new HashMap<String, LocalResource>();
+    amLocalResources.putAll(commonLocalResources);
+
+    TezConfiguration tezConf = new TezConfiguration(
+            mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+
+    TezClient tezClient = new TezClient(tezConf);
+    DAGClient dagClient = null;
+    TezSession tezSession = null;
+    boolean reuseSession = reUseTezSession != null;
+    TezSessionConfiguration tezSessionConfig;
+    AMConfiguration amConfig = new AMConfiguration(
+        "default", commonEnv, amLocalResources,
+        tezConf, null);
+    if(!dagViaRPC) {
+      // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
+      dagClient = tezClient.submitDAGApplication(dag, amConfig);
+    } else {
+      if (reuseSession) {
+        tezSession = reUseTezSession;
+      } else {
+        tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+        tezSession = new TezSession("testsession", tezSessionConfig);
+        tezSession.start();
+      }
+    }
+
+    if (dagViaRPC && closeSessionBeforeSubmit) {
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(mrrTezCluster.getConfig());
+      yarnClient.start();
+      boolean sentKillSession = false;
+      while(true) {
+        Thread.sleep(500l);
+        ApplicationReport appReport =
+            yarnClient.getApplicationReport(tezSession.getApplicationId());
+        if (appReport == null) {
+          continue;
+        }
+        YarnApplicationState appState = appReport.getYarnApplicationState();
+        if (!sentKillSession) {
+          if (appState == YarnApplicationState.RUNNING) {
+            tezSession.stop();
+            sentKillSession = true;
+          }
+        } else {
+          if (appState == YarnApplicationState.FINISHED
+              || appState == YarnApplicationState.KILLED
+              || appState == YarnApplicationState.FAILED) {
+            LOG.info("Application completed after sending session shutdown"
+                + ", yarnApplicationState=" + appState
+                + ", finalAppStatus=" + appReport.getFinalApplicationStatus());
+            Assert.assertEquals(YarnApplicationState.FINISHED,
+                appState);
+            Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+                appReport.getFinalApplicationStatus());
+            break;
+          }
+        }
+      }
+      yarnClient.stop();
+      return null;
+    }
+
+    if(dagViaRPC) {
+      LOG.info("Submitting dag to tez session with appId="
+          + tezSession.getApplicationId());
+      dagClient = tezSession.submitDAG(dag);
+      Assert.assertEquals(TezSessionStatus.RUNNING,
+          tezSession.getSessionStatus());
+    }
+    DAGStatus dagStatus = dagClient.getDAGStatus();
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms."
+          + " Current state: " + dagStatus.getState());
+      Thread.sleep(500l);
+      if(killDagWhileRunning
+          && dagStatus.getState() == DAGStatus.State.RUNNING) {
+        LOG.info("Killing running dag/session");
+        if (dagViaRPC) {
+          tezSession.stop();
+        } else {
+          dagClient.tryKillDAG();
+        }
+      }
+      dagStatus = dagClient.getDAGStatus();
+    }
+    if (dagViaRPC && !reuseSession) {
+      tezSession.stop();
+    }
+    return dagStatus.getState();
+  }
+
+  private static LocalResource createLocalResource(FileSystem fc, Path file,
+      LocalResourceType type, LocalResourceVisibility visibility)
+      throws IOException {
+    FileStatus fstat = fc.getFileStatus(file);
+    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+        .getPath()));
+    long resourceSize = fstat.getLen();
+    long resourceModificationTime = fstat.getModificationTime();
+
+    return LocalResource.newInstance(resourceURL, type, visibility,
+        resourceSize, resourceModificationTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
new file mode 100644
index 0000000..8926ed8
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -0,0 +1,172 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * Configures and starts the Tez-specific components in the YARN cluster.
+ *
+ * When using this mini cluster, the user is expected to
+ */
+public class MiniTezCluster extends MiniYARNCluster {
+
+  public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
+
+  private static final Log LOG = LogFactory.getLog(MiniTezCluster.class);
+
+  private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
+
+  private Path confFilePath;
+
+  public MiniTezCluster(String testName) {
+    this(testName, 1);
+  }
+
+  public MiniTezCluster(String testName, int noOfNMs) {
+    super(testName, noOfNMs, 4, 4);
+  }
+
+  public MiniTezCluster(String testName, int noOfNMs,
+      int numLocalDirs, int numLogDirs)  {
+    super(testName, noOfNMs, numLocalDirs, numLogDirs);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
+    }
+    
+    if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
+      // nothing defined. set quick delete value
+      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+    }
+
+    File appJarFile = new File(MiniTezCluster.APPJAR);
+
+    if (!appJarFile.exists()) {
+      String message = "TezAppJar " + MiniTezCluster.APPJAR
+          + " not found. Exiting.";
+      LOG.info(message);
+      throw new TezUncheckedException(message);
+    } else {
+      conf.set(TezConfiguration.TEZ_LIB_URIS, "file://" + appJarFile.getAbsolutePath());
+      LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
+    }
+
+    // VMEM monitoring disabled, PMEM monitoring enabled.
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+
+    try {
+      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+      if (fc.util().exists(stagingPath)) {
+        LOG.info(stagingPath + " exists! deleting...");
+        fc.delete(stagingPath, true);
+      }
+      LOG.info("mkdir: " + stagingPath);
+      fc.mkdir(stagingPath, null, true);
+
+      //mkdir done directory as well
+      String doneDir =
+          JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+      Path doneDirPath = fc.makeQualified(new Path(doneDir));
+      fc.mkdir(doneDirPath, null, true);
+    } catch (IOException e) {
+      throw new TezUncheckedException("Could not create staging directory. ", e);
+    }
+    conf.set(MRConfig.MASTER_ADDRESS, "test");
+
+    //configure the shuffle service in NM
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+        Service.class);
+
+    // Non-standard shuffle port
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+        DefaultContainerExecutor.class, ContainerExecutor.class);
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    super.serviceStart();
+    File workDir = super.getTestWorkDir();
+    Configuration conf = super.getConfig();
+
+    confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
+    File confFile = new File(confFilePath.toString());
+    try {
+      confFile.createNewFile();
+      conf.writeXml(new FileOutputStream(confFile));
+      confFile.deleteOnExit();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    confFilePath = new Path(confFile.getAbsolutePath());
+    conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        workDir.getAbsolutePath(), System.getProperty("java.class.path"));
+    LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
+        + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
+  }
+
+  public Path getConfigFilePath() {
+    return confFilePath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-yarn-client/findbugs-exclude.xml b/tez-yarn-client/findbugs-exclude.xml
deleted file mode 100644
index 5b11308..0000000
--- a/tez-yarn-client/findbugs-exclude.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<FindBugsFilter>
-
-</FindBugsFilter>