You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/10/26 02:14:04 UTC

[1/3] TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move YARNRunner to tez-mapreduce project (bikas)

Updated Branches:
  refs/heads/master 1c44f634c -> 360d30a9f


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/pom.xml
----------------------------------------------------------------------
diff --git a/tez-yarn-client/pom.xml b/tez-yarn-client/pom.xml
deleted file mode 100644
index 5b9278b..0000000
--- a/tez-yarn-client/pom.xml
+++ /dev/null
@@ -1,72 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.tez</groupId>
-    <artifactId>tez</artifactId>
-    <version>0.2.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>tez-yarn-client</artifactId>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-client</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-mapreduce</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.rat</groupId>
-        <artifactId>apache-rat-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
deleted file mode 100644
index 142fa5d..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.JobID;
-
-public class ClientCache {
-
-  private final Configuration conf;
-  private final ResourceMgrDelegate rm;
-
-  private Map<JobID, ClientServiceDelegate> cache = 
-      new HashMap<JobID, ClientServiceDelegate>();
-
-  public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
-    this.conf = conf;
-    this.rm = rm;
-  }
-
-  //TODO: evict from the cache on some threshold
-  public synchronized ClientServiceDelegate getClient(JobID jobId) {
-    ClientServiceDelegate client = cache.get(jobId);
-    if (client == null) {
-      client = new ClientServiceDelegate(conf, rm, jobId);
-      cache.put(jobId, client);
-    }
-    return client;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
deleted file mode 100644
index 44b2734..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.dag.api.TezConfiguration;
-
-import java.io.IOException;
-
-public class ClientServiceDelegate {
-
-  private final TezConfiguration conf;
-
-  // FIXME
-  // how to handle completed jobs that the RM does not know about?
-
-  public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
-      JobID jobId) {
-    this.conf = new TezConfiguration(conf); // Cloning for modifying.
-    // For faster redirects from AM to HS.
-    this.conf.setInt(
-        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
-        this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
-            MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
-  }
-
-  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId)
-      throws IOException, InterruptedException {
-    // FIXME needs counters support from DAG
-    // with a translation layer on client side
-    org.apache.hadoop.mapreduce.Counters empty =
-        new org.apache.hadoop.mapreduce.Counters();
-    return empty;
-  }
-
-  public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId,
-      int fromEventId, int maxEvents)
-      throws IOException, InterruptedException {
-    // FIXME seems like there is support in client to query task failure
-    // related information
-    // However, api does not make sense for DAG
-    return new TaskCompletionEvent[0];
-  }
-
-  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
-      taId)
-      throws IOException, InterruptedException {
-    // FIXME need support to query task diagnostics?
-    return new String[0];
-  }
-  
-  public JobStatus getJobStatus(JobID oldJobID) throws IOException {
-    // handled in YARNRunner
-    throw new UnsupportedOperationException();
-  }
-
-  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
-      JobID oldJobID, TaskType taskType)
-       throws IOException{
-    // TEZ-146: need to return real task reports
-    return new org.apache.hadoop.mapreduce.TaskReport[0];
-  }
-
-  public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
-       throws IOException {
-    // FIXME need support to kill a task attempt?
-    throw new UnsupportedOperationException();
-  }
-
-  public boolean killJob(JobID oldJobID)
-       throws IOException {
-    // FIXME need support to kill a dag?
-    // Should this be just an RM killApplication?
-    // For one dag per AM, RM kill should suffice
-    throw new UnsupportedOperationException();
-  }
-
-  public LogParams getLogFilePath(JobID oldJobID,
-      TaskAttemptID oldTaskAttemptID)
-      throws YarnException, IOException {
-    // FIXME logs for an attempt?
-    throw new UnsupportedOperationException();
-  }  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
deleted file mode 100644
index 0b768c0..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobPriority;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-
-import com.google.common.base.Joiner;
-
-public class DAGJobStatus extends JobStatus {
-
-  private final String jobFile;
-  private final DAGStatus dagStatus;
-  private final ApplicationReport report;
-
-  public DAGJobStatus(ApplicationReport report, DAGStatus dagStatus, String jobFile) {
-    super();
-    this.dagStatus = dagStatus;
-    this.jobFile = jobFile;
-    this.report = report;
-  }
-
-  @Override
-  protected synchronized void setMapProgress(float p) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setCleanupProgress(float p) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setSetupProgress(float p) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setReduceProgress(float p) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setPriority(JobPriority jp) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setFinishTime(long finishTime) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setHistoryFile(String historyFile) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setTrackingUrl(String trackingUrl) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setRetired() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setState(State state) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setStartTime(long startTime) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setUsername(String userName) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setSchedulingInfo(String schedulingInfo) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setQueue(String queue) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected synchronized void setFailureInfo(String failureInfo) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public synchronized String getQueue() {
-    return report.getQueue();
-  }
-
-  @Override
-  public synchronized float getMapProgress() {
-    if(dagStatus.getVertexProgress() != null) {
-      return getProgress(MultiStageMRConfigUtil.getInitialMapVertexName());
-    }
-    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
-      return 1.0f;
-    }
-    return 0.0f;
-  }
-
-  @Override
-  public synchronized float getCleanupProgress() {
-    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-        dagStatus.getState() == DAGStatus.State.FAILED ||
-        dagStatus.getState() == DAGStatus.State.KILLED ||
-        dagStatus.getState() == DAGStatus.State.ERROR) {
-      return 1.0f;
-    }
-    return 0.0f;
-  }
-
-  @Override
-  public synchronized float getSetupProgress() {
-    if (dagStatus.getState() == DAGStatus.State.RUNNING) {
-      return 1.0f;
-    }
-    return 0.0f;
-  }
-
-  @Override
-  public synchronized float getReduceProgress() {
-    if(dagStatus.getVertexProgress() != null) {
-      return getProgress(MultiStageMRConfigUtil.getFinalReduceVertexName());
-    }
-    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
-      return 1.0f;
-    }
-    return 0.0f;
-  }
-
-  @Override
-  public synchronized State getState() {
-    switch (dagStatus.getState()) {
-    case SUBMITTED:
-    case INITING:
-      return State.PREP;
-    case RUNNING:
-      return State.RUNNING;
-    case SUCCEEDED:
-      return State.SUCCEEDED;
-    case KILLED:
-      return State.KILLED;
-    case FAILED:
-    case ERROR:
-      return State.FAILED;
-    default:
-      throw new TezUncheckedException("Unknown value of DAGState.State:"
-          + dagStatus.getState());
-    }
-  }
-
-  @Override
-  public synchronized long getStartTime() {
-    return report.getStartTime();
-  }
-
-  @Override
-  public JobID getJobID() {
-    return TypeConverter.fromYarn(report.getApplicationId());
-  }
-
-  @Override
-  public synchronized String getUsername() {
-    return report.getUser();
-  }
-
-  @Override
-  public synchronized String getSchedulingInfo() {
-    return report.getTrackingUrl();
-  }
-
-  @Override
-  public synchronized Map<JobACL, AccessControlList> getJobACLs() {
-    // TODO Auto-generated method stub
-    return super.getJobACLs();
-  }
-
-  @Override
-  public synchronized JobPriority getPriority() {
-    // TEX-147: return real priority
-    return JobPriority.NORMAL;
-  }
-
-  @Override
-  public synchronized String getFailureInfo() {
-    return Joiner.on(". ").join(dagStatus.getDiagnostics());
-  }
-
-  @Override
-  public synchronized boolean isJobComplete() {
-    return (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
-        dagStatus.getState() == DAGStatus.State.FAILED ||
-        dagStatus.getState() == DAGStatus.State.KILLED ||
-        dagStatus.getState() == DAGStatus.State.ERROR);
-  }
-
-  @Override
-  public synchronized void write(DataOutput out) throws IOException {
-    // FIXME
-  }
-
-  @Override
-  public synchronized void readFields(DataInput in) throws IOException {
-    // FIXME
-  }
-
-  @Override
-  public String getJobName() {
-    return report.getName();
-  }
-
-  @Override
-  public String getJobFile() {
-    return jobFile;
-  }
-
-  @Override
-  public synchronized String getTrackingUrl() {
-    return report.getTrackingUrl();
-  }
-
-  @Override
-  public synchronized long getFinishTime() {
-    return report.getFinishTime();
-  }
-
-  @Override
-  public synchronized boolean isRetired() {
-    // FIXME handle retired jobs?
-    return false;
-  }
-
-  @Override
-  public synchronized String getHistoryFile() {
-    // FIXME handle history in status
-    return null;
-  }
-
-  @Override
-  public int getNumUsedSlots() {
-    return report.getApplicationResourceUsageReport().getNumUsedContainers();
-  }
-
-  @Override
-  public void setNumUsedSlots(int n) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getNumReservedSlots() {
-    return report.getApplicationResourceUsageReport().
-        getNumReservedContainers();
-  }
-
-  @Override
-  public void setNumReservedSlots(int n) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getUsedMem() {
-    return report.getApplicationResourceUsageReport().
-        getUsedResources().getMemory();
-  }
-
-  @Override
-  public void setUsedMem(int m) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getReservedMem() {
-    return report.getApplicationResourceUsageReport().
-        getReservedResources().getMemory();
-  }
-
-  @Override
-  public void setReservedMem(int r) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getNeededMem() {
-    return report.getApplicationResourceUsageReport().
-        getNeededResources().getMemory();
-  }
-
-  @Override
-  public void setNeededMem(int n) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public synchronized boolean isUber() {
-    return false;
-  }
-
-  @Override
-  public synchronized void setUber(boolean isUber) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String toString() {
-    StringBuffer buffer = new StringBuffer();
-    buffer.append("job-id : " + getJobID());
-    buffer.append("uber-mode : " + isUber());
-    buffer.append("map-progress : " + getMapProgress());
-    buffer.append("reduce-progress : " + getReduceProgress());
-    buffer.append("cleanup-progress : " + getCleanupProgress());
-    buffer.append("setup-progress : " + getSetupProgress());
-    buffer.append("runstate : " + getState());
-    buffer.append("start-time : " + getStartTime());
-    buffer.append("user-name : " + getUsername());
-    buffer.append("priority : " + getPriority());
-    buffer.append("scheduling-info : " + getSchedulingInfo());
-    buffer.append("num-used-slots" + getNumUsedSlots());
-    buffer.append("num-reserved-slots" + getNumReservedSlots());
-    buffer.append("used-mem" + getUsedMem());
-    buffer.append("reserved-mem" + getReservedMem());
-    buffer.append("needed-mem" + getNeededMem());
-    return buffer.toString();
-  }
-
-  private float getProgress(String vertexName) {
-    Progress progress = dagStatus.getVertexProgress().get(vertexName);
-    if(progress == null) {
-      // no such stage. return 0 like MR app currently does.
-      return 0;
-    }
-    float totalTasks = (float) progress.getTotalTaskCount();
-    if(totalTasks != 0) {
-      return progress.getSucceededTaskCount()/totalTasks;
-    }
-    return 0;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
deleted file mode 100644
index e649990..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
-import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
-import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
-import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-
-public class NotRunningJob implements MRClientProtocol {
-
-  private RecordFactory recordFactory =
-    RecordFactoryProvider.getRecordFactory(null);
-
-  private final JobState jobState;
-  private final ApplicationReport applicationReport;
-
-
-  private ApplicationReport getUnknownApplicationReport() {
-    ApplicationId unknownAppId = recordFactory
-        .newRecordInstance(ApplicationId.class);
-    ApplicationAttemptId unknownAttemptId = recordFactory
-        .newRecordInstance(ApplicationAttemptId.class);
-
-    // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
-    // used for a non running job
-    return ApplicationReport.newInstance(unknownAppId, unknownAttemptId, "N/A",
-        "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", "N/A",
-        0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, "TEZ_MRR", null);
-  }
-
-  NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
-    this.applicationReport =
-        (applicationReport ==  null) ?
-            getUnknownApplicationReport() : applicationReport;
-    this.jobState = jobState;
-  }
-
-  @Override
-  public FailTaskAttemptResponse failTaskAttempt(
-      FailTaskAttemptRequest request) throws IOException {
-    FailTaskAttemptResponse resp =
-      recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
-    return resp;
-  }
-
-  @Override
-  public GetCountersResponse getCounters(GetCountersRequest request)
-      throws IOException {
-    GetCountersResponse resp =
-      recordFactory.newRecordInstance(GetCountersResponse.class);
-    Counters counters = recordFactory.newRecordInstance(Counters.class);
-    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
-    resp.setCounters(counters);
-    return resp;
-  }
-
-  @Override
-  public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
-      throws IOException {
-    GetDiagnosticsResponse resp =
-      recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
-    resp.addDiagnostics("");
-    return resp;
-  }
-
-  @Override
-  public GetJobReportResponse getJobReport(GetJobReportRequest request)
-      throws IOException {
-    JobReport jobReport =
-      recordFactory.newRecordInstance(JobReport.class);
-    jobReport.setJobId(request.getJobId());
-    jobReport.setJobState(jobState);
-    jobReport.setUser(applicationReport.getUser());
-    jobReport.setStartTime(applicationReport.getStartTime());
-    jobReport.setDiagnostics(applicationReport.getDiagnostics());
-    jobReport.setJobName(applicationReport.getName());
-    jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
-    jobReport.setFinishTime(applicationReport.getFinishTime());
-
-    GetJobReportResponse resp =
-        recordFactory.newRecordInstance(GetJobReportResponse.class);
-    resp.setJobReport(jobReport);
-    return resp;
-  }
-
-  @Override
-  public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
-      GetTaskAttemptCompletionEventsRequest request)
-      throws IOException {
-    GetTaskAttemptCompletionEventsResponse resp =
-      recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
-    resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
-    return resp;
-  }
-
-  @Override
-  public GetTaskAttemptReportResponse getTaskAttemptReport(
-      GetTaskAttemptReportRequest request) throws IOException {
-    //not invoked by anybody
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
-      throws IOException {
-    GetTaskReportResponse resp =
-      recordFactory.newRecordInstance(GetTaskReportResponse.class);
-    TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
-    report.setTaskId(request.getTaskId());
-    report.setTaskState(TaskState.NEW);
-    Counters counters = recordFactory.newRecordInstance(Counters.class);
-    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
-    report.setCounters(counters);
-    report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
-    return resp;
-  }
-
-  @Override
-  public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
-      throws IOException {
-    GetTaskReportsResponse resp =
-      recordFactory.newRecordInstance(GetTaskReportsResponse.class);
-    resp.addAllTaskReports(new ArrayList<TaskReport>());
-    return resp;
-  }
-
-  @Override
-  public KillJobResponse killJob(KillJobRequest request)
-      throws IOException {
-    KillJobResponse resp =
-      recordFactory.newRecordInstance(KillJobResponse.class);
-    return resp;
-  }
-
-  @Override
-  public KillTaskResponse killTask(KillTaskRequest request)
-      throws IOException {
-    KillTaskResponse resp =
-      recordFactory.newRecordInstance(KillTaskResponse.class);
-    return resp;
-  }
-
-  @Override
-  public KillTaskAttemptResponse killTaskAttempt(
-      KillTaskAttemptRequest request) throws IOException {
-    KillTaskAttemptResponse resp =
-      recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
-    return resp;
-  }
-
-  @Override
-  public GetDelegationTokenResponse getDelegationToken(
-      GetDelegationTokenRequest request) throws IOException {
-    /* Should not be invoked by anyone. */
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public RenewDelegationTokenResponse renewDelegationToken(
-      RenewDelegationTokenRequest request) throws IOException {
-    /* Should not be invoked by anyone. */
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public CancelDelegationTokenResponse cancelDelegationToken(
-      CancelDelegationTokenRequest request) throws IOException {
-    /* Should not be invoked by anyone. */
-    throw new NotImplementedException();
-  }
-
-  @Override
-  public InetSocketAddress getConnectAddress() {
-    /* Should not be invoked by anyone.  Normally used to set token service */
-    throw new NotImplementedException();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
deleted file mode 100644
index 0e767b4..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.QueueAclsInfo;
-import org.apache.hadoop.mapreduce.QueueInfo;
-import org.apache.hadoop.mapreduce.TaskTrackerInfo;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-public class ResourceMgrDelegate {
-  private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
-      
-  private YarnConfiguration conf;
-  private GetNewApplicationResponse application;
-  private ApplicationId applicationId;
-  private YarnClient client;
-  private InetSocketAddress rmAddress;
-
-  /**
-   * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
-   * @param conf the configuration object.
-   */
-  public ResourceMgrDelegate(YarnConfiguration conf) {
-    super();
-    this.conf = conf;
-    client = YarnClient.createYarnClient();
-    client.init(conf);
-    this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_PORT);
-    client.start();
-  }
-
-  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
-      InterruptedException {
-    try {
-      return TypeConverter.fromYarnNodes(client.getNodeReports());
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
-    try {
-      Set<String> appTypes = new HashSet<String>(1);
-      appTypes.add(TezConfiguration.TEZ_APPLICATION_TYPE);
-      return TypeConverter.fromYarnApps(client.getApplications(appTypes),
-          this.conf);
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
-      InterruptedException {
-    // TODO: Implement getBlacklistedTrackers
-    LOG.warn("getBlacklistedTrackers - Not implemented yet");
-    return new TaskTrackerInfo[0];
-  }
-
-  public ClusterMetrics getClusterMetrics() throws IOException,
-      InterruptedException {
-    YarnClusterMetrics metrics;
-    try {
-      metrics = client.getYarnClusterMetrics();
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-    ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
-        metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
-        metrics.getNumNodeManagers(), 0, 0);
-    return oldMetrics;
-  }
-
-  @SuppressWarnings("rawtypes")
-  public Token getDelegationToken(Text renewer) throws IOException,
-      InterruptedException {
-    try {
-      // Remove rmAddress after YARN-868 is addressed
-      return ConverterUtils.convertFromYarn(
-        client.getRMDelegationToken(renewer), rmAddress);
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public String getFilesystemName() throws IOException, InterruptedException {
-    return FileSystem.get(conf).getUri().toString();
-  }
-
-  public JobID getNewJobID() throws IOException, InterruptedException {
-    try {
-      this.application = 
-          client.createApplication().getNewApplicationResponse();
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-    this.applicationId = this.application.getApplicationId();
-    return TypeConverter.fromYarn(applicationId);
-  }
-
-  public QueueInfo getQueue(String queueName) throws IOException,
-  InterruptedException {
-    try {
-      return TypeConverter.fromYarn(
-          client.getQueueInfo(queueName), this.conf);
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
-      InterruptedException {
-    try {
-      return TypeConverter.fromYarnQueueUserAclsInfo(
-          client.getQueueAclsInfo());
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public QueueInfo[] getQueues() throws IOException, InterruptedException {
-    try {
-      return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
-    try {
-      return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
-          this.conf);
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public QueueInfo[] getChildQueues(String parent) throws IOException,
-      InterruptedException {
-    try {
-      return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
-        this.conf);
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public String getStagingAreaDir() throws IOException, InterruptedException {
-//    Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
-    String user = 
-      UserGroupInformation.getCurrentUser().getShortUserName();
-    Path path = MRApps.getStagingAreaDir(conf, user);
-    LOG.debug("getStagingAreaDir: dir=" + path);
-    return path.toString();
-  }
-
-
-  public String getSystemDir() throws IOException, InterruptedException {
-    Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
-    //FileContext.getFileContext(conf).delete(sysDir, true);
-    return sysDir.toString();
-  }
-  
-
-  public long getTaskTrackerExpiryInterval() throws IOException,
-      InterruptedException {
-    return 0;
-  }
-  
-  public void setJobPriority(JobID arg0, String arg1) throws IOException,
-      InterruptedException {
-    return;
-  }
-
-
-  public long getProtocolVersion(String arg0, long arg1) throws IOException {
-    return 0;
-  }
-
-  public ApplicationId getApplicationId() {
-    return applicationId;
-  }
-
-  public void killApplication(ApplicationId appId)
-      throws YarnException, IOException {
-    client.killApplication(appId);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
deleted file mode 100644
index 96f6936..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ /dev/null
@@ -1,722 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.QueueAclsInfo;
-import org.apache.hadoop.mapreduce.QueueInfo;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskCompletionEvent;
-import org.apache.hadoop.mapreduce.TaskReport;
-import org.apache.hadoop.mapreduce.TaskTrackerInfo;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.TypeConverter;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.hadoop.mapreduce.v2.LogParams;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.AMConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This class enables the current JobClient (0.22 hadoop) to run on YARN-TEZ.
- */
-@SuppressWarnings({ "unchecked" })
-public class YARNRunner implements ClientProtocol {
-
-  private static final Log LOG = LogFactory.getLog(YARNRunner.class);
-
-  private ResourceMgrDelegate resMgrDelegate;
-  private ClientCache clientCache;
-  private Configuration conf;
-  private final FileContext defaultFileContext;
-
-  final public static FsPermission DAG_FILE_PERMISSION =
-      FsPermission.createImmutable((short) 0644);
-  final public static int UTF8_CHUNK_SIZE = 16 * 1024;
-
-  private final TezConfiguration tezConf;
-  private final TezClient tezClient;
-  private DAGClient dagClient;
-
-  /**
-   * Yarn runner incapsulates the client interface of
-   * yarn
-   * @param conf the configuration object for the client
-   */
-  public YARNRunner(Configuration conf) {
-   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
-  }
-
-  /**
-   * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
-   * {@link ResourceMgrDelegate}. Enables mocking and testing.
-   * @param conf the configuration object for the client
-   * @param resMgrDelegate the resourcemanager client handle.
-   */
-  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
-   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
-  }
-
-  /**
-   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
-   * but allowing injecting {@link ClientCache}. Enable mocking and testing.
-   * @param conf the configuration object
-   * @param resMgrDelegate the resource manager delegate
-   * @param clientCache the client cache object.
-   */
-  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
-      ClientCache clientCache) {
-    this.conf = conf;
-    this.tezConf = new TezConfiguration(conf);
-    try {
-      this.resMgrDelegate = resMgrDelegate;
-      this.tezClient = new TezClient(tezConf);
-      this.clientCache = clientCache;
-      this.defaultFileContext = FileContext.getFileContext(this.conf);
-
-    } catch (UnsupportedFileSystemException ufe) {
-      throw new RuntimeException("Error in instantiating YarnClient", ufe);
-    }
-  }
-
-  @VisibleForTesting
-  @Private
-  /**
-   * Used for testing mostly.
-   * @param resMgrDelegate the resource manager delegate to set to.
-   */
-  public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
-    this.resMgrDelegate = resMgrDelegate;
-  }
-
-  @Override
-  public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
-      throws IOException, InterruptedException {
-    throw new UnsupportedOperationException("Use Token.renew instead");
-  }
-
-  @Override
-  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
-      InterruptedException {
-    return resMgrDelegate.getActiveTrackers();
-  }
-
-  @Override
-  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
-    return resMgrDelegate.getAllJobs();
-  }
-
-  @Override
-  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
-      InterruptedException {
-    return resMgrDelegate.getBlacklistedTrackers();
-  }
-
-  @Override
-  public ClusterMetrics getClusterMetrics() throws IOException,
-      InterruptedException {
-    return resMgrDelegate.getClusterMetrics();
-  }
-
-  @Override
-  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
-      throws IOException, InterruptedException {
-    // The token is only used for serialization. So the type information
-    // mismatch should be fine.
-    return resMgrDelegate.getDelegationToken(renewer);
-  }
-
-  @Override
-  public String getFilesystemName() throws IOException, InterruptedException {
-    return resMgrDelegate.getFilesystemName();
-  }
-
-  @Override
-  public JobID getNewJobID() throws IOException, InterruptedException {
-    return resMgrDelegate.getNewJobID();
-  }
-
-  @Override
-  public QueueInfo getQueue(String queueName) throws IOException,
-      InterruptedException {
-    return resMgrDelegate.getQueue(queueName);
-  }
-
-  @Override
-  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
-      InterruptedException {
-    return resMgrDelegate.getQueueAclsForCurrentUser();
-  }
-
-  @Override
-  public QueueInfo[] getQueues() throws IOException, InterruptedException {
-    return resMgrDelegate.getQueues();
-  }
-
-  @Override
-  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
-    return resMgrDelegate.getRootQueues();
-  }
-
-  @Override
-  public QueueInfo[] getChildQueues(String parent) throws IOException,
-      InterruptedException {
-    return resMgrDelegate.getChildQueues(parent);
-  }
-
-  @Override
-  public String getStagingAreaDir() throws IOException, InterruptedException {
-    return resMgrDelegate.getStagingAreaDir();
-  }
-
-  @Override
-  public String getSystemDir() throws IOException, InterruptedException {
-    return resMgrDelegate.getSystemDir();
-  }
-
-  @Override
-  public long getTaskTrackerExpiryInterval() throws IOException,
-      InterruptedException {
-    return resMgrDelegate.getTaskTrackerExpiryInterval();
-  }
-
-  private Map<String, LocalResource> createJobLocalResources(
-      Configuration jobConf, String jobSubmitDir)
-      throws IOException {
-
-    // Setup LocalResources
-    Map<String, LocalResource> localResources =
-        new HashMap<String, LocalResource>();
-
-    Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
-
-    URL yarnUrlForJobSubmitDir = ConverterUtils
-        .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
-            .resolvePath(
-                defaultFileContext.makeQualified(new Path(jobSubmitDir))));
-    LOG.debug("Creating setup context, jobSubmitDir url is "
-        + yarnUrlForJobSubmitDir);
-
-    localResources.put(MRJobConfig.JOB_CONF_FILE,
-        createApplicationResource(defaultFileContext,
-            jobConfPath, LocalResourceType.FILE));
-    if (jobConf.get(MRJobConfig.JAR) != null) {
-      Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
-      LocalResource rc = createApplicationResource(defaultFileContext,
-          jobJarPath,
-          LocalResourceType.FILE);
-      // FIXME fix pattern support
-      // String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
-      // JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
-      // rc.setPattern(pattern);
-      localResources.put(MRJobConfig.JOB_JAR, rc);
-    } else {
-      // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
-      // mapreduce jar itself which is already on the classpath.
-      LOG.info("Job jar is not present. "
-          + "Not adding any jar to the list of resources.");
-    }
-
-    // TODO gross hack
-    for (String s : new String[] {
-        MRJobConfig.JOB_SPLIT,
-        MRJobConfig.JOB_SPLIT_METAINFO}) {
-      localResources.put(s,
-          createApplicationResource(defaultFileContext,
-              new Path(jobSubmitDir, s), LocalResourceType.FILE));
-    }
-
-    MRApps.setupDistributedCache(jobConf, localResources);
-
-    return localResources;
-  }
-
-  // FIXME isn't this a nice mess of a client?
-  // read input, write splits, read splits again
-  private List<TaskLocationHint> getMapLocationHintsFromInputSplits(JobID jobId,
-      FileSystem fs, Configuration conf,
-      String jobSubmitDir) throws IOException {
-    TaskSplitMetaInfo[] splitsInfo =
-        SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
-            new Path(jobSubmitDir));
-    int splitsCount = splitsInfo.length;
-    List<TaskLocationHint> locationHints =
-        new ArrayList<TaskLocationHint>(splitsCount);
-    for (int i = 0; i < splitsCount; ++i) {
-      TaskLocationHint locationHint =
-          new TaskLocationHint(
-              new HashSet<String>(
-                  Arrays.asList(splitsInfo[i].getLocations())), null);
-      locationHints.add(locationHint);
-    }
-    return locationHints;
-  }
-
-  private void setupMapReduceEnv(Configuration jobConf,
-      Map<String, String> environment, boolean isMap) throws IOException {
-
-    if (isMap) {
-      warnForJavaLibPath(
-          jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
-          "map",
-          MRJobConfig.MAP_JAVA_OPTS,
-          MRJobConfig.MAP_ENV);
-      warnForJavaLibPath(
-          jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
-          "map",
-          MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
-          MRJobConfig.MAPRED_ADMIN_USER_ENV);
-    } else {
-      warnForJavaLibPath(
-          jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
-          "reduce",
-          MRJobConfig.REDUCE_JAVA_OPTS,
-          MRJobConfig.REDUCE_ENV);
-      warnForJavaLibPath(
-          jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
-          "reduce",
-          MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
-          MRJobConfig.MAPRED_ADMIN_USER_ENV);
-    }
-
-    MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
-  }
-
-  private Vertex createVertexForStage(Configuration stageConf,
-      Map<String, LocalResource> jobLocalResources,
-      List<TaskLocationHint> locations, int stageNum, int totalStages)
-      throws IOException {
-    // stageNum starts from 0, goes till numStages - 1
-    boolean isMap = false;
-    if (stageNum == 0) {
-      isMap = true;
-    }
-
-    int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
-        : stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
-    String processorName = isMap ? MapProcessor.class.getName()
-        : ReduceProcessor.class.getName();
-    String vertexName = null;
-    if (isMap) {
-      vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
-    } else {
-      if (stageNum == totalStages - 1) {
-        vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
-      } else {
-        vertexName = MultiStageMRConfigUtil
-            .getIntermediateStageVertexName(stageNum);
-      }
-    }
-
-    Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
-        : MRHelpers.getReduceResource(stageConf);
-    byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
-    Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
-        setUserPayload(vertexUserPayload),
-        numTasks, taskResource);
-    if (isMap) {
-      byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
-      MRHelpers.addMRInput(vertex, mapInputPayload, null);
-    }
-    // Map only jobs.
-    if (stageNum == totalStages -1) {
-      MRHelpers.addMROutput(vertex, vertexUserPayload);
-    }
-
-    Map<String, String> taskEnv = new HashMap<String, String>();
-    setupMapReduceEnv(stageConf, taskEnv, isMap);
-
-    Map<String, LocalResource> taskLocalResources =
-        new TreeMap<String, LocalResource>();
-    // PRECOMMIT Remove split localization for reduce tasks if it's being set
-    // here
-    taskLocalResources.putAll(jobLocalResources);
-
-    String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
-        : MRHelpers.getReduceJavaOpts(stageConf);
-
-    vertex.setTaskEnvironment(taskEnv)
-        .setTaskLocalResources(taskLocalResources)
-        .setTaskLocationsHint(locations)
-        .setJavaOpts(taskJavaOpts);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Adding vertex to DAG" + ", vertexName="
-          + vertex.getVertexName() + ", processor="
-          + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
-          + vertex.getParallelism() + ", javaOpts=" + vertex.getJavaOpts()
-          + ", resources=" + vertex.getTaskResource()
-      // TODO Add localResources and Environment
-      );
-    }
-
-    return vertex;
-  }
-
-  private DAG createDAG(FileSystem fs, JobID jobId, Configuration[] stageConfs,
-      String jobSubmitDir, Credentials ts,
-      Map<String, LocalResource> jobLocalResources) throws IOException {
-
-    String jobName = stageConfs[0].get(MRJobConfig.JOB_NAME,
-        YarnConfiguration.DEFAULT_APPLICATION_NAME);
-    DAG dag = new DAG(jobName);
-
-    LOG.info("Number of stages: " + stageConfs.length);
-
-    List<TaskLocationHint> mapInputLocations =
-        getMapLocationHintsFromInputSplits(
-            jobId, fs, stageConfs[0], jobSubmitDir);
-    List<TaskLocationHint> reduceInputLocations = null;
-
-    Vertex[] vertices = new Vertex[stageConfs.length];
-    for (int i = 0; i < stageConfs.length; i++) {
-      vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
-          i == 0 ? mapInputLocations : reduceInputLocations, i,
-          stageConfs.length);
-    }
-
-    for (int i = 0; i < vertices.length; i++) {
-      dag.addVertex(vertices[i]);
-      if (i > 0) {
-        EdgeProperty edgeProperty = new EdgeProperty(
-            DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-            SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor(OnFileSortedOutput.class.getName()),
-            new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
-
-        Edge edge = null;
-        edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);
-        dag.addEdge(edge);
-      }
-
-    }
-    return dag;
-  }
-
-  private TezConfiguration getDAGAMConfFromMRConf() {
-    TezConfiguration finalConf = new TezConfiguration(this.tezConf);
-    Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
-        .getMRToDAGParamMap();
-
-    for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
-      if (finalConf.get(entry.getKey()) != null) {
-        finalConf.set(entry.getValue(), finalConf.get(entry.getKey()));
-        finalConf.unset(entry.getKey());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
-              + " to Tez key: " + entry.getValue() + " with value "
-              + finalConf.get(entry.getValue()));
-        }
-      }
-    }
-    return finalConf;
-  }
-
-  @Override
-  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
-  throws IOException, InterruptedException {
-
-    ApplicationId appId = resMgrDelegate.getApplicationId();
-
-    FileSystem fs = FileSystem.get(conf);
-    // Loads the job.xml written by the user.
-    JobConf jobConf = new JobConf(new TezConfiguration(conf));
-
-    // Extract individual raw MR configs.
-    Configuration[] stageConfs = MultiStageMRConfToTezTranslator
-        .getStageConfs(jobConf);
-
-    // Transform all confs to use Tez keys
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
-        null);
-    for (int i = 1; i < stageConfs.length; i++) {
-      MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i],
-          stageConfs[i - 1]);
-    }
-
-    // create inputs to tezClient.submit()
-
-    // FIXME set up job resources
-    Map<String, LocalResource> jobLocalResources =
-        createJobLocalResources(stageConfs[0], jobSubmitDir);
-
-    // FIXME createDAG should take the tezConf as a parameter, instead of using
-    // MR keys.
-    DAG dag = createDAG(fs, jobId, stageConfs, jobSubmitDir, ts,
-        jobLocalResources);
-
-    List<String> vargs = new LinkedList<String>();
-    // admin command opts and user command opts
-    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
-        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
-    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
-        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
-    vargs.add(mrAppMasterAdminOptions);
-
-    // Add AM user command opts
-    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
-        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
-    warnForJavaLibPath(mrAppMasterUserOptions, "app master",
-        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
-    vargs.add(mrAppMasterUserOptions);
-
-    StringBuilder javaOpts = new StringBuilder();
-    for (String varg : vargs) {
-      javaOpts.append(varg).append(" ");
-    }
-
-    // Setup the CLASSPATH in environment
-    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
-    Map<String, String> environment = new HashMap<String, String>();
-
-    // Setup the environment variables for AM
-    MRHelpers.updateEnvironmentForMRAM(conf, environment);
-
-    TezConfiguration dagAMConf = getDAGAMConfFromMRConf();
-    dagAMConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, javaOpts.toString());
-
-    // Submit to ResourceManager
-    try {
-      dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
-          jobSubmitDir);
-      AMConfiguration amConfig = new AMConfiguration(
-          jobConf.get(JobContext.QUEUE_NAME,
-              YarnConfiguration.DEFAULT_QUEUE_NAME),
-          environment,
-          jobLocalResources, dagAMConf, ts);
-      tezClient.submitDAGApplication(appId, dag, amConfig);
-    } catch (TezException e) {
-      throw new IOException(e);
-    }
-
-    return getJobStatus(jobId);
-  }
-
-  private LocalResource createApplicationResource(FileContext fs, Path p,
-      LocalResourceType type) throws IOException {
-    LocalResource rsrc = Records.newRecord(LocalResource.class);
-    FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
-        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
-    rsrc.setSize(rsrcStat.getLen());
-    rsrc.setTimestamp(rsrcStat.getModificationTime());
-    rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    return rsrc;
-  }
-
-  @Override
-  public void setJobPriority(JobID arg0, String arg1) throws IOException,
-      InterruptedException {
-    resMgrDelegate.setJobPriority(arg0, arg1);
-  }
-
-  @Override
-  public long getProtocolVersion(String arg0, long arg1) throws IOException {
-    return resMgrDelegate.getProtocolVersion(arg0, arg1);
-  }
-
-  @Override
-  public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
-      throws IOException, InterruptedException {
-    throw new UnsupportedOperationException("Use Token.renew instead");
-  }
-
-
-  @Override
-  public Counters getJobCounters(JobID arg0) throws IOException,
-      InterruptedException {
-    return clientCache.getClient(arg0).getJobCounters(arg0);
-  }
-
-  @Override
-  public String getJobHistoryDir() throws IOException, InterruptedException {
-    return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
-  }
-
-  @Override
-  public JobStatus getJobStatus(JobID jobID) throws IOException,
-      InterruptedException {
-    String user = UserGroupInformation.getCurrentUser().getShortUserName();
-    String jobFile = MRApps.getJobFile(conf, user, jobID);
-    DAGStatus dagStatus;
-    try {
-      if(dagClient == null) {
-        dagClient = tezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId());
-      }
-      dagStatus = dagClient.getDAGStatus();
-      return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
-    } catch (TezException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
-      int arg2) throws IOException, InterruptedException {
-    return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
-  }
-
-  @Override
-  public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
-      InterruptedException {
-    return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
-  }
-
-  @Override
-  public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
-  throws IOException, InterruptedException {
-    return clientCache.getClient(jobID)
-        .getTaskReports(jobID, taskType);
-  }
-
-  @Override
-  public void killJob(JobID arg0) throws IOException, InterruptedException {
-    /* check if the status is not running, if not send kill to RM */
-    JobStatus status = getJobStatus(arg0);
-    if (status.getState() == JobStatus.State.RUNNING ||
-        status.getState() == JobStatus.State.PREP) {
-      try {
-        resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
-      } catch (YarnException e) {
-        throw new IOException(e);
-      }
-      return;
-    }
-  }
-
-  @Override
-  public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
-      InterruptedException {
-    return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
-  }
-
-  @Override
-  public AccessControlList getQueueAdmins(String arg0) throws IOException {
-    return new AccessControlList("*");
-  }
-
-  @Override
-  public JobTrackerStatus getJobTrackerStatus() throws IOException,
-      InterruptedException {
-    return JobTrackerStatus.RUNNING;
-  }
-
-  @Override
-  public ProtocolSignature getProtocolSignature(String protocol,
-      long clientVersion, int clientMethodsHash) throws IOException {
-    return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
-        clientMethodsHash);
-  }
-
-  @Override
-  public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
-      throws IOException {
-    try {
-      return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
-    } catch (YarnException e) {
-      throw new IOException(e);
-    }
-  }
-
-  private static void warnForJavaLibPath(String opts, String component,
-      String javaConf, String envConf) {
-    if (opts != null && opts.contains("-Djava.library.path")) {
-      LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
-               "programs to no longer function if hadoop native libraries " +
-               "are used. These values should be set as part of the " +
-               "LD_LIBRARY_PATH in the " + component + " JVM env using " +
-               envConf + " config settings.");
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
deleted file mode 100644
index 680939b..0000000
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
-import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-
-public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
-
-  @Override
-  public ClientProtocol create(Configuration conf) throws IOException {
-    if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
-      return new YARNRunner(conf);
-    }
-    return null;
-  }
-
-  @Override
-  public ClientProtocol create(InetSocketAddress addr, Configuration conf)
-      throws IOException {
-    return create(conf);
-  }
-
-  @Override
-  public void close(ClientProtocol clientProtocol) throws IOException {
-    // nothing to do
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
deleted file mode 100644
index 88816ca..0000000
--- a/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
+++ /dev/null
@@ -1,14 +0,0 @@
-#
-#   Licensed under the Apache License, Version 2.0 (the "License");
-#   you may not use this file except in compliance with the License.
-#   You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#   Unless required by applicable law or agreed to in writing, software
-#   distributed under the License is distributed on an "AS IS" BASIS,
-#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#   See the License for the specific language governing permissions and
-#   limitations under the License.
-#
-org.apache.tez.mapreduce.YarnTezClientProtocolProvider


[3/3] git commit: TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move YARNRunner to tez-mapreduce project (bikas)

Posted by bi...@apache.org.
TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move YARNRunner to tez-mapreduce project (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/360d30a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/360d30a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/360d30a9

Branch: refs/heads/master
Commit: 360d30a9fcad3a412901b48d85596b863b73f0d7
Parents: 1c44f63
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Oct 25 17:10:19 2013 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Oct 25 17:10:19 2013 -0700

----------------------------------------------------------------------
 pom.xml                                         |  10 +-
 tez-dist/pom.xml                                |   8 +-
 tez-mapreduce-tests/pom.xml                     |  98 ---
 .../apache/tez/mapreduce/MiniMRRTezCluster.java | 172 -----
 .../org/apache/tez/mapreduce/TestMRRJobs.java   | 358 ---------
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 532 --------------
 .../tez/mapreduce/client/ClientCache.java       |  50 ++
 .../mapreduce/client/ClientServiceDelegate.java | 105 +++
 .../tez/mapreduce/client/DAGJobStatus.java      | 386 ++++++++++
 .../tez/mapreduce/client/NotRunningJob.java     | 240 ++++++
 .../mapreduce/client/ResourceMgrDelegate.java   | 232 ++++++
 .../apache/tez/mapreduce/client/YARNRunner.java | 722 +++++++++++++++++++
 .../client/YarnTezClientProtocolProvider.java   |  50 ++
 ...op.mapreduce.protocol.ClientProtocolProvider |  14 +
 tez-tests/pom.xml                               |  93 +++
 .../org/apache/tez/mapreduce/TestMRRJobs.java   | 359 +++++++++
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 533 ++++++++++++++
 .../org/apache/tez/test/MiniTezCluster.java     | 172 +++++
 tez-yarn-client/findbugs-exclude.xml            |  16 -
 tez-yarn-client/pom.xml                         |  72 --
 .../org/apache/tez/mapreduce/ClientCache.java   |  50 --
 .../tez/mapreduce/ClientServiceDelegate.java    | 105 ---
 .../org/apache/tez/mapreduce/DAGJobStatus.java  | 386 ----------
 .../org/apache/tez/mapreduce/NotRunningJob.java | 240 ------
 .../tez/mapreduce/ResourceMgrDelegate.java      | 232 ------
 .../org/apache/tez/mapreduce/YARNRunner.java    | 722 -------------------
 .../YarnTezClientProtocolProvider.java          |  50 --
 ...op.mapreduce.protocol.ClientProtocolProvider |  14 -
 28 files changed, 2959 insertions(+), 3062 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 3addf16..34cefb2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,7 +120,7 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tez</groupId>
-        <artifactId>tez-mapreduce-tests</artifactId>
+        <artifactId>tez-tests</artifactId>
         <version>${project.version}</version>
       </dependency>
       <dependency>
@@ -129,11 +129,6 @@
         <version>${project.version}</version>
       </dependency>
       <dependency>
-        <groupId>org.apache.tez</groupId>
-        <artifactId>tez-yarn-client</artifactId>
-        <version>${project.version}</version>
-      </dependency>
-      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>${hadoop.version}</version>
@@ -259,10 +254,9 @@
     <module>tez-common</module>
     <module>tez-runtime-library</module>
     <module>tez-runtime-internals</module>
-    <module>tez-yarn-client</module>
     <module>tez-mapreduce</module>
     <module>tez-mapreduce-examples</module>
-    <module>tez-mapreduce-tests</module>
+    <module>tez-tests</module>
     <module>tez-dag</module>
     <module>tez-dist</module>
     <module>docs</module>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dist/pom.xml b/tez-dist/pom.xml
index e946f78..dc72598 100644
--- a/tez-dist/pom.xml
+++ b/tez-dist/pom.xml
@@ -30,12 +30,6 @@
   <packaging>pom</packaging>
 
   <dependencies>
-    <!--tez-yarn-client should require all other modules to be built before it, so this becomes the last -->
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-yarn-client</artifactId>
-      <version>${project.version}</version>
-    </dependency>
     <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-dag</artifactId>
@@ -48,7 +42,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tez</groupId>
-      <artifactId>tez-mapreduce-tests</artifactId>
+      <artifactId>tez-tests</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>
     </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/pom.xml b/tez-mapreduce-tests/pom.xml
deleted file mode 100644
index 4ffed02..0000000
--- a/tez-mapreduce-tests/pom.xml
+++ /dev/null
@@ -1,98 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.tez</groupId>
-    <artifactId>tez</artifactId>
-    <version>0.2.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>tez-mapreduce-tests</artifactId>
-
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-dag</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-mapreduce</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-yarn-client</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-mapreduce-examples</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-tests</artifactId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-      <scope>test</scope>
-      <type>test-jar</type>
-    </dependency>
-    <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
-      <scope>test</scope>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.rat</groupId>
-        <artifactId>apache-rat-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
deleted file mode 100644
index 278e124..0000000
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/MiniMRRTezCluster.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.ShuffleHandler;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.util.JarFinder;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.app.DAGAppMaster;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-/**
- * Configures and starts the Tez-specific components in the YARN cluster.
- *
- * When using this mini cluster, the user is expected to
- */
-public class MiniMRRTezCluster extends MiniYARNCluster {
-
-  public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
-
-  private static final Log LOG = LogFactory.getLog(MiniMRRTezCluster.class);
-
-  private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
-
-  private Path confFilePath;
-
-  public MiniMRRTezCluster(String testName) {
-    this(testName, 1);
-  }
-
-  public MiniMRRTezCluster(String testName, int noOfNMs) {
-    super(testName, noOfNMs, 4, 4);
-  }
-
-  public MiniMRRTezCluster(String testName, int noOfNMs,
-      int numLocalDirs, int numLogDirs)  {
-    super(testName, noOfNMs, numLocalDirs, numLogDirs);
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
-    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
-      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
-          "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
-    }
-    
-    if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
-      // nothing defined. set quick delete value
-      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
-    }
-
-    File appJarFile = new File(MiniMRRTezCluster.APPJAR);
-
-    if (!appJarFile.exists()) {
-      String message = "TezAppJar " + MiniMRRTezCluster.APPJAR
-          + " not found. Exiting.";
-      LOG.info(message);
-      throw new TezUncheckedException(message);
-    } else {
-      conf.set(TezConfiguration.TEZ_LIB_URIS, "file://" + appJarFile.getAbsolutePath());
-      LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
-    }
-
-    // VMEM monitoring disabled, PMEM monitoring enabled.
-    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
-    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
-
-    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
-
-    try {
-      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
-          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
-      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
-      if (fc.util().exists(stagingPath)) {
-        LOG.info(stagingPath + " exists! deleting...");
-        fc.delete(stagingPath, true);
-      }
-      LOG.info("mkdir: " + stagingPath);
-      fc.mkdir(stagingPath, null, true);
-
-      //mkdir done directory as well
-      String doneDir =
-          JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
-      Path doneDirPath = fc.makeQualified(new Path(doneDir));
-      fc.mkdir(doneDirPath, null, true);
-    } catch (IOException e) {
-      throw new TezUncheckedException("Could not create staging directory. ", e);
-    }
-    conf.set(MRConfig.MASTER_ADDRESS, "test");
-
-    //configure the shuffle service in NM
-    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
-        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
-    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
-        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
-        Service.class);
-
-    // Non-standard shuffle port
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
-
-    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
-        DefaultContainerExecutor.class, ContainerExecutor.class);
-
-    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
-    // for corresponding uberized tests.
-    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-    super.serviceInit(conf);
-  }
-
-  @Override
-  public void serviceStart() throws Exception {
-    super.serviceStart();
-    File workDir = super.getTestWorkDir();
-    Configuration conf = super.getConfig();
-
-    confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
-    File confFile = new File(confFilePath.toString());
-    try {
-      confFile.createNewFile();
-      conf.writeXml(new FileOutputStream(confFile));
-      confFile.deleteOnExit();
-    } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-      throw new RuntimeException(e);
-    }
-    confFilePath = new Path(confFile.getAbsolutePath());
-    conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-        workDir.getAbsolutePath(), System.getProperty("java.class.path"));
-    LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
-        + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
-  }
-
-  public Path getConfigFilePath() {
-    return confFilePath;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
deleted file mode 100644
index b85ddb6..0000000
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.RandomTextWriterJob;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.mapreduce.examples.MRRSleepJob;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestMRRJobs {
-
-  private static final Log LOG = LogFactory.getLog(TestMRRJobs.class);
-
-  protected static MiniMRRTezCluster mrrTezCluster;
-  protected static MiniDFSCluster dfsCluster;
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem remoteFs;
-
-  private static String TEST_ROOT_DIR = "target"
-      + Path.SEPARATOR + TestMRRJobs.class.getName() + "-tmpDir";
-
-  private static final String OUTPUT_ROOT_DIR = "/tmp" + Path.SEPARATOR +
-      TestMRRJobs.class.getSimpleName();
-
-  @BeforeClass
-  public static void setup() throws IOException {
-    try {
-      conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
-      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
-      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
-        .format(true).racks(null).build();
-      remoteFs = dfsCluster.getFileSystem();
-    } catch (IOException io) {
-      throw new RuntimeException("problem starting mini dfs cluster", io);
-    }
-
-    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
-               + " not found. Not running test.");
-      return;
-    }
-
-    if (mrrTezCluster == null) {
-      mrrTezCluster = new MiniMRRTezCluster(TestMRRJobs.class.getName(), 1,
-          1, 1);
-      Configuration conf = new Configuration();
-      conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS
-      conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
-      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
-      mrrTezCluster.init(conf);
-      mrrTezCluster.start();
-    }
-
-  }
-
-  @AfterClass
-  public static void tearDown() {
-    if (mrrTezCluster != null) {
-      mrrTezCluster.stop();
-      mrrTezCluster = null;
-    }
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-  }
-
-  @Test (timeout = 60000)
-  public void testMRRSleepJob() throws IOException, InterruptedException,
-      ClassNotFoundException {
-    LOG.info("\n\n\nStarting testMRRSleepJob().");
-
-    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
-               + " not found. Not running test.");
-      return;
-    }
-
-    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
-
-    MRRSleepJob sleepJob = new MRRSleepJob();
-    sleepJob.setConf(sleepConf);
-
-    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
-        1, 1, 1, 1, 1);
-
-    job.setJarByClass(MRRSleepJob.class);
-    job.setMaxMapAttempts(1); // speed up failures
-    job.submit();
-    String trackingUrl = job.getTrackingURL();
-    String jobId = job.getJobID().toString();
-    boolean succeeded = job.waitForCompletion(true);
-    Assert.assertTrue(succeeded);
-    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-    Assert.assertTrue("Tracking URL was " + trackingUrl +
-                      " but didn't Match Job ID " + jobId ,
-          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
-
-    // FIXME once counters and task progress can be obtained properly
-    // TODO use dag client to test counters and task progress?
-    // what about completed jobs?
-  }
-
-  @Test (timeout = 60000)
-  public void testRandomWriter() throws IOException, InterruptedException,
-      ClassNotFoundException {
-
-    LOG.info("\n\n\nStarting testRandomWriter().");
-    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
-               + " not found. Not running test.");
-      return;
-    }
-
-    RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
-    mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
-    mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
-    Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
-    Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
-    FileOutputFormat.setOutputPath(job, outputDir);
-    job.setSpeculativeExecution(false);
-    job.setJarByClass(RandomTextWriterJob.class);
-    job.setMaxMapAttempts(1); // speed up failures
-    job.submit();
-    String trackingUrl = job.getTrackingURL();
-    String jobId = job.getJobID().toString();
-    boolean succeeded = job.waitForCompletion(true);
-    Assert.assertTrue(succeeded);
-    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-    Assert.assertTrue("Tracking URL was " + trackingUrl +
-                      " but didn't Match Job ID " + jobId ,
-          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
-
-    // Make sure there are three files in the output-dir
-
-    RemoteIterator<FileStatus> iterator =
-        FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
-            outputDir);
-    int count = 0;
-    while (iterator.hasNext()) {
-      FileStatus file = iterator.next();
-      if (!file.getPath().getName()
-          .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
-        count++;
-      }
-    }
-    Assert.assertEquals("Number of part files is wrong!", 3, count);
-
-  }
-
-
-  @Test (timeout = 60000)
-  public void testFailingJob() throws IOException, InterruptedException,
-      ClassNotFoundException {
-
-    LOG.info("\n\n\nStarting testFailingJob().");
-
-    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
-               + " not found. Not running test.");
-      return;
-    }
-
-    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
-
-    MRRSleepJob sleepJob = new MRRSleepJob();
-    sleepJob.setConf(sleepConf);
-
-    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
-        1, 1, 1, 1, 1);
-
-    job.setJarByClass(MRRSleepJob.class);
-    job.setMaxMapAttempts(1); // speed up failures
-    job.getConfiguration().setBoolean(MRRSleepJob.MAP_FATAL_ERROR, true);
-    job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "*");
-
-    job.submit();
-    boolean succeeded = job.waitForCompletion(true);
-    Assert.assertFalse(succeeded);
-    Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
-
-    // FIXME once counters and task progress can be obtained properly
-    // TODO verify failed task diagnostics
-  }
-
-  @Test (timeout = 60000)
-  public void testFailingAttempt() throws IOException, InterruptedException,
-      ClassNotFoundException {
-
-    LOG.info("\n\n\nStarting testFailingAttempt().");
-
-    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
-               + " not found. Not running test.");
-      return;
-    }
-
-    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
-
-    MRRSleepJob sleepJob = new MRRSleepJob();
-    sleepJob.setConf(sleepConf);
-
-    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
-        1, 1, 1, 1, 1);
-
-    job.setJarByClass(MRRSleepJob.class);
-    job.setMaxMapAttempts(3); // speed up failures
-    job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
-    job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
-
-    job.submit();
-    boolean succeeded = job.waitForCompletion(true);
-    Assert.assertTrue(succeeded);
-    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-
-    // FIXME once counters and task progress can be obtained properly
-    // TODO verify failed task diagnostics
-  }
-
-  @Test (timeout = 60000)
-  public void testMRRSleepJobWithCompression() throws IOException,
-      InterruptedException, ClassNotFoundException {
-    LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
-
-    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
-      LOG.info("MRAppJar " + MiniMRRTezCluster.APPJAR
-               + " not found. Not running test.");
-      return;
-    }
-
-    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
-
-    MRRSleepJob sleepJob = new MRRSleepJob();
-    sleepJob.setConf(sleepConf);
-
-    Job job = sleepJob.createJob(1, 1, 2, 1, 1,
-        1, 1, 1, 1, 1);
-
-    job.setJarByClass(MRRSleepJob.class);
-    job.setMaxMapAttempts(1); // speed up failures
-
-    // enable compression
-    job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
-    job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
-        DefaultCodec.class.getName());
-
-    job.submit();
-    String trackingUrl = job.getTrackingURL();
-    String jobId = job.getJobID().toString();
-    boolean succeeded = job.waitForCompletion(true);
-    Assert.assertTrue(succeeded);
-    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-    Assert.assertTrue("Tracking URL was " + trackingUrl +
-                      " but didn't Match Job ID " + jobId ,
-          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
-
-    // FIXME once counters and task progress can be obtained properly
-    // TODO use dag client to test counters and task progress?
-    // what about completed jobs?
-
-  }
-
-
-  /*
-  //@Test (timeout = 60000)
-  public void testMRRSleepJobWithSecurityOn() throws IOException,
-      InterruptedException, ClassNotFoundException {
-
-    LOG.info("\n\n\nStarting testMRRSleepJobWithSecurityOn().");
-
-    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
-      return;
-    }
-
-    mrrTezCluster.getConfig().set(
-        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-        "kerberos");
-    mrrTezCluster.getConfig().set(YarnConfiguration.RM_KEYTAB, "/etc/krb5.keytab");
-    mrrTezCluster.getConfig().set(YarnConfiguration.NM_KEYTAB, "/etc/krb5.keytab");
-    mrrTezCluster.getConfig().set(YarnConfiguration.RM_PRINCIPAL,
-        "rm/sightbusy-lx@LOCALHOST");
-    mrrTezCluster.getConfig().set(YarnConfiguration.NM_PRINCIPAL,
-        "nm/sightbusy-lx@LOCALHOST");
-
-    UserGroupInformation.setConfiguration(mrrTezCluster.getConfig());
-
-    // Keep it in here instead of after RM/NM as multiple user logins happen in
-    // the same JVM.
-    UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
-    LOG.info("User name is " + user.getUserName());
-    for (Token<? extends TokenIdentifier> str : user.getTokens()) {
-      LOG.info("Token is " + str.encodeToUrlString());
-    }
-    user.doAs(new PrivilegedExceptionAction<Void>() {
-      @Override
-      public Void run() throws Exception {
-        MRRSleepJob sleepJob = new MRRSleepJob();
-        sleepJob.setConf(mrrTezCluster.getConfig());
-        Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
-        // //Job with reduces
-        // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
-        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
-        job.submit();
-        String trackingUrl = job.getTrackingURL();
-        String jobId = job.getJobID().toString();
-        job.waitForCompletion(true);
-        Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-        Assert.assertTrue("Tracking URL was " + trackingUrl +
-                          " but didn't Match Job ID " + jobId ,
-          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
-        return null;
-      }
-    });
-
-    // TODO later:  add explicit "isUber()" checks of some sort
-  }
-  */
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
deleted file mode 100644
index a6dbe26..0000000
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.client.AMConfiguration;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.client.TezSession;
-import org.apache.tez.client.TezSessionConfiguration;
-import org.apache.tez.client.TezSessionStatus;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
-import org.apache.tez.mapreduce.examples.MRRSleepJob;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
-import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
-import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
-import org.apache.tez.mapreduce.processor.map.MapProcessor;
-import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
-import org.apache.tez.runtime.api.TezRootInputInitializer;
-import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestMRRJobsDAGApi {
-
-  private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
-
-  protected static MiniMRRTezCluster mrrTezCluster;
-  protected static MiniDFSCluster dfsCluster;
-
-  private static Configuration conf = new Configuration();
-  private static FileSystem remoteFs;
-
-  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
-      + TestMRRJobsDAGApi.class.getName() + "-tmpDir";
-
-  @BeforeClass
-  public static void setup() throws IOException {
-    try {
-      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
-      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
-          .format(true).racks(null).build();
-      remoteFs = dfsCluster.getFileSystem();
-    } catch (IOException io) {
-      throw new RuntimeException("problem starting mini dfs cluster", io);
-    }
-    
-    if (mrrTezCluster == null) {
-      mrrTezCluster = new MiniMRRTezCluster(TestMRRJobsDAGApi.class.getName(),
-          1, 1, 1);
-      Configuration conf = new Configuration();
-      conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
-      mrrTezCluster.init(conf);
-      mrrTezCluster.start();
-    }
-
-  }
-
-  @AfterClass
-  public static void tearDown() {
-    if (mrrTezCluster != null) {
-      mrrTezCluster.stop();
-      mrrTezCluster = null;
-    }
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-    // TODO Add cleanup code.
-  }
-
-  // Submits a simple 5 stage sleep job using the DAG submit API instead of job
-  // client.
-  @Test(timeout = 60000)
-  public void testMRRSleepJobDagSubmit() throws IOException,
-  InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
-
-    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
-    // TODO Add additional checks for tracking URL etc. - once it's exposed by
-    // the DAG API.
-  }
-
-  // Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
-  @Test(timeout = 60000)
-  public void testMRRSleepJobDagSubmitAndKill() throws IOException,
-  InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
-
-    Assert.assertEquals(DAGStatus.State.KILLED, finalState);
-    // TODO Add additional checks for tracking URL etc. - once it's exposed by
-    // the DAG API.
-  }
-
-  // Submits a DAG to AM via RPC after AM has started
-  @Test(timeout = 60000)
-  public void testMRRSleepJobViaSession() throws IOException,
-  InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
-
-    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
-  }
-
-  // Submits a DAG to AM via RPC after AM has started
-  @Test(timeout = 120000)
-  public void testMultipleMRRSleepJobViaSession() throws IOException,
-  InterruptedException, TezException, ClassNotFoundException, YarnException {
-    Map<String, String> commonEnv = createCommonEnv();
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
-        .valueOf(new Random().nextInt(100000))));
-    remoteFs.mkdirs(remoteStagingDir);
-    TezConfiguration tezConf = new TezConfiguration(
-        mrrTezCluster.getConfig());
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
-        remoteStagingDir.toString());
-
-    Map<String, LocalResource> amLocalResources =
-        new HashMap<String, LocalResource>();
-
-    AMConfiguration amConfig = new AMConfiguration(
-        "default", commonEnv, amLocalResources,
-        tezConf, null);
-    TezSessionConfiguration tezSessionConfig =
-        new TezSessionConfiguration(amConfig, tezConf);
-    TezSession tezSession = new TezSession("testsession", tezSessionConfig);
-    tezSession.start();
-    Assert.assertEquals(TezSessionStatus.INITIALIZING,
-        tezSession.getSessionStatus());
-
-    State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
-        tezSession, false);
-    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
-    Assert.assertEquals(TezSessionStatus.READY,
-        tezSession.getSessionStatus());
-    finalState = testMRRSleepJobDagSubmitCore(true, false, false,
-        tezSession, false);
-    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
-    Assert.assertEquals(TezSessionStatus.READY,
-        tezSession.getSessionStatus());
-
-    ApplicationId appId = tezSession.getApplicationId();
-    tezSession.stop();
-    Assert.assertEquals(TezSessionStatus.SHUTDOWN,
-        tezSession.getSessionStatus());
-
-    YarnClient yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(mrrTezCluster.getConfig());
-    yarnClient.start();
-
-    while (true) {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (appReport.getYarnApplicationState().equals(
-          YarnApplicationState.FINISHED)
-          || appReport.getYarnApplicationState().equals(
-              YarnApplicationState.FAILED)
-          || appReport.getYarnApplicationState().equals(
-              YarnApplicationState.KILLED)) {
-        break;
-      }
-    }
-
-    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-    Assert.assertEquals(YarnApplicationState.FINISHED,
-        appReport.getYarnApplicationState());
-    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
-        appReport.getFinalApplicationStatus());
-  }
-
-  // Submits a simple 5 stage sleep job using tez session. Then kills it.
-  @Test(timeout = 60000)
-  public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
-  InterruptedException, TezException, ClassNotFoundException, YarnException {
-    State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
-
-    Assert.assertEquals(DAGStatus.State.KILLED, finalState);
-    // TODO Add additional checks for tracking URL etc. - once it's exposed by
-    // the DAG API.
-  }
-
-  // Create and close a tez session without submitting a job
-  @Test(timeout = 60000)
-  public void testTezSessionShutdown() throws IOException,
-  InterruptedException, TezException, ClassNotFoundException, YarnException {
-    testMRRSleepJobDagSubmitCore(true, false, true, false);
-  }
-
-  @Test(timeout = 60000)
-  public void testAMSplitGeneration() throws IOException, InterruptedException,
-      TezException, ClassNotFoundException, YarnException {
-    testMRRSleepJobDagSubmitCore(true, false, false, true);
-  }
-
-  public State testMRRSleepJobDagSubmitCore(
-      boolean dagViaRPC,
-      boolean killDagWhileRunning,
-      boolean closeSessionBeforeSubmit,
-      boolean genSplitsInAM) throws IOException,
-      InterruptedException, TezException, ClassNotFoundException,
-      YarnException {
-    return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
-        closeSessionBeforeSubmit, null, genSplitsInAM);
-  }
-
-  private Map<String, String> createCommonEnv() {
-    Map<String, String> commonEnv = new HashMap<String, String>();
-    return commonEnv;
-  }
-
-  public State testMRRSleepJobDagSubmitCore(
-      boolean dagViaRPC,
-      boolean killDagWhileRunning,
-      boolean closeSessionBeforeSubmit,
-      TezSession reUseTezSession,
-      boolean genSplitsInAM) throws IOException,
-      InterruptedException, TezException, ClassNotFoundException,
-      YarnException {
-    LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
-
-    JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
-    JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
-    JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
-
-    stage1Conf.setLong(MRRSleepJob.MAP_SLEEP_TIME, 1);
-    stage1Conf.setInt(MRRSleepJob.MAP_SLEEP_COUNT, 1);
-    stage1Conf.setInt(MRJobConfig.NUM_MAPS, 1);
-    stage1Conf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
-    stage1Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        IntWritable.class.getName());
-    stage1Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    stage1Conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
-        SleepInputFormat.class.getName());
-    stage1Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
-        MRRSleepJobPartitioner.class.getName());
-
-    stage2Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
-    stage2Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
-    stage2Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
-    stage2Conf
-        .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
-    stage2Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        IntWritable.class.getName());
-    stage2Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
-        MRRSleepJobPartitioner.class.getName());
-
-    JobConf stage22Conf = new JobConf(stage2Conf);
-    stage22Conf.setInt(MRJobConfig.NUM_REDUCES, 2);
-
-    stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
-    stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
-    stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
-    stage3Conf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
-    stage3Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
-        IntWritable.class.getName());
-    stage3Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        IntWritable.class.getName());
-    stage3Conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
-        NullOutputFormat.class.getName());
-
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf,
-        stage1Conf);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage22Conf,
-        stage1Conf);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage3Conf,
-        stage2Conf); // this also works stage22 as it sets up keys etc
-
-    MRHelpers.doJobClientMagic(stage1Conf);
-    MRHelpers.doJobClientMagic(stage2Conf);
-    MRHelpers.doJobClientMagic(stage22Conf);
-    MRHelpers.doJobClientMagic(stage3Conf);
-
-    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
-        .valueOf(new Random().nextInt(100000))));
-    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
-    InputSplitInfo inputSplitInfo = null;
-    if (!genSplitsInAM) {
-      inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
-        remoteStagingDir);
-    }
-
-    byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
-    byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
-    byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
-    
-    DAG dag = new DAG("testMRRSleepJobDagSubmit");
-    int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
-    Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
-        : null;
-    Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
-        MapProcessor.class.getName()).setUserPayload(stage1Payload),
-        stage1NumTasks, Resource.newInstance(256, 1));
-    MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
-    Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
-        ReduceProcessor.class.getName()).setUserPayload(
-        MRHelpers.createUserPayloadFromConf(stage2Conf)),
-        1, Resource.newInstance(256, 1));
-    Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
-        ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
-        1, Resource.newInstance(256, 1));
-    MRHelpers.addMROutput(stage3Vertex, stage3Payload);
-
-    Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
-    Map<String, String> commonEnv = createCommonEnv();
-
-    if (!genSplitsInAM) {
-      // TODO Use utility method post TEZ-205.
-      Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
-      stage1LocalResources.put(
-          inputSplitInfo.getSplitsFile().getName(),
-          createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
-              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-      stage1LocalResources.put(
-          inputSplitInfo.getSplitsMetaInfoFile().getName(),
-          createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
-              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
-      stage1LocalResources.putAll(commonLocalResources);
-
-      stage1Vertex.setTaskLocalResources(stage1LocalResources);
-      stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
-    } else {
-      stage1Vertex.setTaskLocalResources(commonLocalResources);
-    }
-
-    stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
-    stage1Vertex.setTaskEnvironment(commonEnv);
-
-    // TODO env, resources
-
-    stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf));
-    stage2Vertex.setTaskLocalResources(commonLocalResources);
-    stage2Vertex.setTaskEnvironment(commonEnv);
-
-    stage3Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage3Conf));
-    stage3Vertex.setTaskLocalResources(commonLocalResources);
-    stage3Vertex.setTaskEnvironment(commonEnv);
-
-    dag.addVertex(stage1Vertex);
-    dag.addVertex(stage2Vertex);
-    dag.addVertex(stage3Vertex);
-
-    Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
-        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInputLegacy.class.getName())));
-    Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
-        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInputLegacy.class.getName())));
-
-    dag.addEdge(edge1);
-    dag.addEdge(edge2);
-
-    Map<String, LocalResource> amLocalResources =
-        new HashMap<String, LocalResource>();
-    amLocalResources.putAll(commonLocalResources);
-
-    TezConfiguration tezConf = new TezConfiguration(
-            mrrTezCluster.getConfig());
-    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
-        remoteStagingDir.toString());
-
-    TezClient tezClient = new TezClient(tezConf);
-    DAGClient dagClient = null;
-    TezSession tezSession = null;
-    boolean reuseSession = reUseTezSession != null;
-    TezSessionConfiguration tezSessionConfig;
-    AMConfiguration amConfig = new AMConfiguration(
-        "default", commonEnv, amLocalResources,
-        tezConf, null);
-    if(!dagViaRPC) {
-      // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
-      dagClient = tezClient.submitDAGApplication(dag, amConfig);
-    } else {
-      if (reuseSession) {
-        tezSession = reUseTezSession;
-      } else {
-        tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
-        tezSession = new TezSession("testsession", tezSessionConfig);
-        tezSession.start();
-      }
-    }
-
-    if (dagViaRPC && closeSessionBeforeSubmit) {
-      YarnClient yarnClient = YarnClient.createYarnClient();
-      yarnClient.init(mrrTezCluster.getConfig());
-      yarnClient.start();
-      boolean sentKillSession = false;
-      while(true) {
-        Thread.sleep(500l);
-        ApplicationReport appReport =
-            yarnClient.getApplicationReport(tezSession.getApplicationId());
-        if (appReport == null) {
-          continue;
-        }
-        YarnApplicationState appState = appReport.getYarnApplicationState();
-        if (!sentKillSession) {
-          if (appState == YarnApplicationState.RUNNING) {
-            tezSession.stop();
-            sentKillSession = true;
-          }
-        } else {
-          if (appState == YarnApplicationState.FINISHED
-              || appState == YarnApplicationState.KILLED
-              || appState == YarnApplicationState.FAILED) {
-            LOG.info("Application completed after sending session shutdown"
-                + ", yarnApplicationState=" + appState
-                + ", finalAppStatus=" + appReport.getFinalApplicationStatus());
-            Assert.assertEquals(YarnApplicationState.FINISHED,
-                appState);
-            Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
-                appReport.getFinalApplicationStatus());
-            break;
-          }
-        }
-      }
-      yarnClient.stop();
-      return null;
-    }
-
-    if(dagViaRPC) {
-      LOG.info("Submitting dag to tez session with appId="
-          + tezSession.getApplicationId());
-      dagClient = tezSession.submitDAG(dag);
-      Assert.assertEquals(TezSessionStatus.RUNNING,
-          tezSession.getSessionStatus());
-    }
-    DAGStatus dagStatus = dagClient.getDAGStatus();
-    while (!dagStatus.isCompleted()) {
-      LOG.info("Waiting for job to complete. Sleeping for 500ms."
-          + " Current state: " + dagStatus.getState());
-      Thread.sleep(500l);
-      if(killDagWhileRunning
-          && dagStatus.getState() == DAGStatus.State.RUNNING) {
-        LOG.info("Killing running dag/session");
-        if (dagViaRPC) {
-          tezSession.stop();
-        } else {
-          dagClient.tryKillDAG();
-        }
-      }
-      dagStatus = dagClient.getDAGStatus();
-    }
-    if (dagViaRPC && !reuseSession) {
-      tezSession.stop();
-    }
-    return dagStatus.getState();
-  }
-
-  private static LocalResource createLocalResource(FileSystem fc, Path file,
-      LocalResourceType type, LocalResourceVisibility visibility)
-      throws IOException {
-    FileStatus fstat = fc.getFileStatus(file);
-    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
-        .getPath()));
-    long resourceSize = fstat.getLen();
-    long resourceModificationTime = fstat.getModificationTime();
-
-    return LocalResource.newInstance(resourceURL, type, visibility,
-        resourceSize, resourceModificationTime);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java
new file mode 100644
index 0000000..f7c8e07
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientCache.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.client;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+
+public class ClientCache {
+
+  private final Configuration conf;
+  private final ResourceMgrDelegate rm;
+
+  private Map<JobID, ClientServiceDelegate> cache = 
+      new HashMap<JobID, ClientServiceDelegate>();
+
+  public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+    this.conf = conf;
+    this.rm = rm;
+  }
+
+  //TODO: evict from the cache on some threshold
+  public synchronized ClientServiceDelegate getClient(JobID jobId) {
+    ClientServiceDelegate client = cache.get(jobId);
+    if (client == null) {
+      client = new ClientServiceDelegate(conf, rm, jobId);
+      cache.put(jobId, client);
+    }
+    return client;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java
new file mode 100644
index 0000000..e0fab1f
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ClientServiceDelegate.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.TezConfiguration;
+
+import java.io.IOException;
+
+public class ClientServiceDelegate {
+
+  private final TezConfiguration conf;
+
+  // FIXME
+  // how to handle completed jobs that the RM does not know about?
+
+  public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+      JobID jobId) {
+    this.conf = new TezConfiguration(conf); // Cloning for modifying.
+    // For faster redirects from AM to HS.
+    this.conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+        this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
+            MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
+  }
+
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID jobId)
+      throws IOException, InterruptedException {
+    // FIXME needs counters support from DAG
+    // with a translation layer on client side
+    org.apache.hadoop.mapreduce.Counters empty =
+        new org.apache.hadoop.mapreduce.Counters();
+    return empty;
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobId,
+      int fromEventId, int maxEvents)
+      throws IOException, InterruptedException {
+    // FIXME seems like there is support in client to query task failure
+    // related information
+    // However, api does not make sense for DAG
+    return new TaskCompletionEvent[0];
+  }
+
+  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
+      taId)
+      throws IOException, InterruptedException {
+    // FIXME need support to query task diagnostics?
+    return new String[0];
+  }
+  
+  public JobStatus getJobStatus(JobID oldJobID) throws IOException {
+    // handled in YARNRunner
+    throw new UnsupportedOperationException();
+  }
+
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
+      JobID oldJobID, TaskType taskType)
+       throws IOException{
+    // TEZ-146: need to return real task reports
+    return new org.apache.hadoop.mapreduce.TaskReport[0];
+  }
+
+  public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+       throws IOException {
+    // FIXME need support to kill a task attempt?
+    throw new UnsupportedOperationException();
+  }
+
+  public boolean killJob(JobID oldJobID)
+       throws IOException {
+    // FIXME need support to kill a dag?
+    // Should this be just an RM killApplication?
+    // For one dag per AM, RM kill should suffice
+    throw new UnsupportedOperationException();
+  }
+
+  public LogParams getLogFilePath(JobID oldJobID,
+      TaskAttemptID oldTaskAttemptID)
+      throws YarnException, IOException {
+    // FIXME logs for an attempt?
+    throw new UnsupportedOperationException();
+  }  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/DAGJobStatus.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/DAGJobStatus.java
new file mode 100644
index 0000000..9acd836
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/DAGJobStatus.java
@@ -0,0 +1,386 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.Progress;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+
+import com.google.common.base.Joiner;
+
+public class DAGJobStatus extends JobStatus {
+
+  private final String jobFile;
+  private final DAGStatus dagStatus;
+  private final ApplicationReport report;
+
+  public DAGJobStatus(ApplicationReport report, DAGStatus dagStatus, String jobFile) {
+    super();
+    this.dagStatus = dagStatus;
+    this.jobFile = jobFile;
+    this.report = report;
+  }
+
+  @Override
+  protected synchronized void setMapProgress(float p) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setCleanupProgress(float p) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setSetupProgress(float p) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setReduceProgress(float p) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setPriority(JobPriority jp) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setFinishTime(long finishTime) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setHistoryFile(String historyFile) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setTrackingUrl(String trackingUrl) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setRetired() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setState(State state) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setStartTime(long startTime) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setUsername(String userName) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setSchedulingInfo(String schedulingInfo) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setJobACLs(Map<JobACL, AccessControlList> acls) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setQueue(String queue) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected synchronized void setFailureInfo(String failureInfo) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized String getQueue() {
+    return report.getQueue();
+  }
+
+  @Override
+  public synchronized float getMapProgress() {
+    if(dagStatus.getVertexProgress() != null) {
+      return getProgress(MultiStageMRConfigUtil.getInitialMapVertexName());
+    }
+    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
+      return 1.0f;
+    }
+    return 0.0f;
+  }
+
+  @Override
+  public synchronized float getCleanupProgress() {
+    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+        dagStatus.getState() == DAGStatus.State.FAILED ||
+        dagStatus.getState() == DAGStatus.State.KILLED ||
+        dagStatus.getState() == DAGStatus.State.ERROR) {
+      return 1.0f;
+    }
+    return 0.0f;
+  }
+
+  @Override
+  public synchronized float getSetupProgress() {
+    if (dagStatus.getState() == DAGStatus.State.RUNNING) {
+      return 1.0f;
+    }
+    return 0.0f;
+  }
+
+  @Override
+  public synchronized float getReduceProgress() {
+    if(dagStatus.getVertexProgress() != null) {
+      return getProgress(MultiStageMRConfigUtil.getFinalReduceVertexName());
+    }
+    if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
+      return 1.0f;
+    }
+    return 0.0f;
+  }
+
+  @Override
+  public synchronized State getState() {
+    switch (dagStatus.getState()) {
+    case SUBMITTED:
+    case INITING:
+      return State.PREP;
+    case RUNNING:
+      return State.RUNNING;
+    case SUCCEEDED:
+      return State.SUCCEEDED;
+    case KILLED:
+      return State.KILLED;
+    case FAILED:
+    case ERROR:
+      return State.FAILED;
+    default:
+      throw new TezUncheckedException("Unknown value of DAGState.State:"
+          + dagStatus.getState());
+    }
+  }
+
+  @Override
+  public synchronized long getStartTime() {
+    return report.getStartTime();
+  }
+
+  @Override
+  public JobID getJobID() {
+    return TypeConverter.fromYarn(report.getApplicationId());
+  }
+
+  @Override
+  public synchronized String getUsername() {
+    return report.getUser();
+  }
+
+  @Override
+  public synchronized String getSchedulingInfo() {
+    return report.getTrackingUrl();
+  }
+
+  @Override
+  public synchronized Map<JobACL, AccessControlList> getJobACLs() {
+    // TODO Auto-generated method stub
+    return super.getJobACLs();
+  }
+
+  @Override
+  public synchronized JobPriority getPriority() {
+    // TEX-147: return real priority
+    return JobPriority.NORMAL;
+  }
+
+  @Override
+  public synchronized String getFailureInfo() {
+    return Joiner.on(". ").join(dagStatus.getDiagnostics());
+  }
+
+  @Override
+  public synchronized boolean isJobComplete() {
+    return (dagStatus.getState() == DAGStatus.State.SUCCEEDED ||
+        dagStatus.getState() == DAGStatus.State.FAILED ||
+        dagStatus.getState() == DAGStatus.State.KILLED ||
+        dagStatus.getState() == DAGStatus.State.ERROR);
+  }
+
+  @Override
+  public synchronized void write(DataOutput out) throws IOException {
+    // FIXME
+  }
+
+  @Override
+  public synchronized void readFields(DataInput in) throws IOException {
+    // FIXME
+  }
+
+  @Override
+  public String getJobName() {
+    return report.getName();
+  }
+
+  @Override
+  public String getJobFile() {
+    return jobFile;
+  }
+
+  @Override
+  public synchronized String getTrackingUrl() {
+    return report.getTrackingUrl();
+  }
+
+  @Override
+  public synchronized long getFinishTime() {
+    return report.getFinishTime();
+  }
+
+  @Override
+  public synchronized boolean isRetired() {
+    // FIXME handle retired jobs?
+    return false;
+  }
+
+  @Override
+  public synchronized String getHistoryFile() {
+    // FIXME handle history in status
+    return null;
+  }
+
+  @Override
+  public int getNumUsedSlots() {
+    return report.getApplicationResourceUsageReport().getNumUsedContainers();
+  }
+
+  @Override
+  public void setNumUsedSlots(int n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getNumReservedSlots() {
+    return report.getApplicationResourceUsageReport().
+        getNumReservedContainers();
+  }
+
+  @Override
+  public void setNumReservedSlots(int n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getUsedMem() {
+    return report.getApplicationResourceUsageReport().
+        getUsedResources().getMemory();
+  }
+
+  @Override
+  public void setUsedMem(int m) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getReservedMem() {
+    return report.getApplicationResourceUsageReport().
+        getReservedResources().getMemory();
+  }
+
+  @Override
+  public void setReservedMem(int r) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int getNeededMem() {
+    return report.getApplicationResourceUsageReport().
+        getNeededResources().getMemory();
+  }
+
+  @Override
+  public void setNeededMem(int n) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public synchronized boolean isUber() {
+    return false;
+  }
+
+  @Override
+  public synchronized void setUber(boolean isUber) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer buffer = new StringBuffer();
+    buffer.append("job-id : " + getJobID());
+    buffer.append("uber-mode : " + isUber());
+    buffer.append("map-progress : " + getMapProgress());
+    buffer.append("reduce-progress : " + getReduceProgress());
+    buffer.append("cleanup-progress : " + getCleanupProgress());
+    buffer.append("setup-progress : " + getSetupProgress());
+    buffer.append("runstate : " + getState());
+    buffer.append("start-time : " + getStartTime());
+    buffer.append("user-name : " + getUsername());
+    buffer.append("priority : " + getPriority());
+    buffer.append("scheduling-info : " + getSchedulingInfo());
+    buffer.append("num-used-slots" + getNumUsedSlots());
+    buffer.append("num-reserved-slots" + getNumReservedSlots());
+    buffer.append("used-mem" + getUsedMem());
+    buffer.append("reserved-mem" + getReservedMem());
+    buffer.append("needed-mem" + getNeededMem());
+    return buffer.toString();
+  }
+
+  private float getProgress(String vertexName) {
+    Progress progress = dagStatus.getVertexProgress().get(vertexName);
+    if(progress == null) {
+      // no such stage. return 0 like MR app currently does.
+      return 0;
+    }
+    float totalTasks = (float) progress.getTotalTaskCount();
+    if(totalTasks != 0) {
+      return progress.getSucceededTaskCount()/totalTasks;
+    }
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/NotRunningJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/NotRunningJob.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/NotRunningJob.java
new file mode 100644
index 0000000..e178948
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/NotRunningJob.java
@@ -0,0 +1,240 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+
+public class NotRunningJob implements MRClientProtocol {
+
+  private RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  private final JobState jobState;
+  private final ApplicationReport applicationReport;
+
+
+  private ApplicationReport getUnknownApplicationReport() {
+    ApplicationId unknownAppId = recordFactory
+        .newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId unknownAttemptId = recordFactory
+        .newRecordInstance(ApplicationAttemptId.class);
+
+    // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
+    // used for a non running job
+    return ApplicationReport.newInstance(unknownAppId, unknownAttemptId, "N/A",
+        "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A", "N/A",
+        0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A", 0.0f, "TEZ_MRR", null);
+  }
+
+  NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
+    this.applicationReport =
+        (applicationReport ==  null) ?
+            getUnknownApplicationReport() : applicationReport;
+    this.jobState = jobState;
+  }
+
+  @Override
+  public FailTaskAttemptResponse failTaskAttempt(
+      FailTaskAttemptRequest request) throws IOException {
+    FailTaskAttemptResponse resp =
+      recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
+    return resp;
+  }
+
+  @Override
+  public GetCountersResponse getCounters(GetCountersRequest request)
+      throws IOException {
+    GetCountersResponse resp =
+      recordFactory.newRecordInstance(GetCountersResponse.class);
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+    resp.setCounters(counters);
+    return resp;
+  }
+
+  @Override
+  public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+      throws IOException {
+    GetDiagnosticsResponse resp =
+      recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+    resp.addDiagnostics("");
+    return resp;
+  }
+
+  @Override
+  public GetJobReportResponse getJobReport(GetJobReportRequest request)
+      throws IOException {
+    JobReport jobReport =
+      recordFactory.newRecordInstance(JobReport.class);
+    jobReport.setJobId(request.getJobId());
+    jobReport.setJobState(jobState);
+    jobReport.setUser(applicationReport.getUser());
+    jobReport.setStartTime(applicationReport.getStartTime());
+    jobReport.setDiagnostics(applicationReport.getDiagnostics());
+    jobReport.setJobName(applicationReport.getName());
+    jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
+    jobReport.setFinishTime(applicationReport.getFinishTime());
+
+    GetJobReportResponse resp =
+        recordFactory.newRecordInstance(GetJobReportResponse.class);
+    resp.setJobReport(jobReport);
+    return resp;
+  }
+
+  @Override
+  public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+      GetTaskAttemptCompletionEventsRequest request)
+      throws IOException {
+    GetTaskAttemptCompletionEventsResponse resp =
+      recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+    resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
+    return resp;
+  }
+
+  @Override
+  public GetTaskAttemptReportResponse getTaskAttemptReport(
+      GetTaskAttemptReportRequest request) throws IOException {
+    //not invoked by anybody
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+      throws IOException {
+    GetTaskReportResponse resp =
+      recordFactory.newRecordInstance(GetTaskReportResponse.class);
+    TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
+    report.setTaskId(request.getTaskId());
+    report.setTaskState(TaskState.NEW);
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+    report.setCounters(counters);
+    report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
+    return resp;
+  }
+
+  @Override
+  public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+      throws IOException {
+    GetTaskReportsResponse resp =
+      recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+    resp.addAllTaskReports(new ArrayList<TaskReport>());
+    return resp;
+  }
+
+  @Override
+  public KillJobResponse killJob(KillJobRequest request)
+      throws IOException {
+    KillJobResponse resp =
+      recordFactory.newRecordInstance(KillJobResponse.class);
+    return resp;
+  }
+
+  @Override
+  public KillTaskResponse killTask(KillTaskRequest request)
+      throws IOException {
+    KillTaskResponse resp =
+      recordFactory.newRecordInstance(KillTaskResponse.class);
+    return resp;
+  }
+
+  @Override
+  public KillTaskAttemptResponse killTaskAttempt(
+      KillTaskAttemptRequest request) throws IOException {
+    KillTaskAttemptResponse resp =
+      recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+    return resp;
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws IOException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws IOException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws IOException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public InetSocketAddress getConnectAddress() {
+    /* Should not be invoked by anyone.  Normally used to set token service */
+    throw new NotImplementedException();
+  }
+}


[2/3] TEZ-581. Rename MiniMRRTezCluster to MiniTezCluster. Move YARNRunner to tez-mapreduce project (bikas)

Posted by bi...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
new file mode 100644
index 0000000..d3b2c08
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/ResourceMgrDelegate.java
@@ -0,0 +1,232 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class ResourceMgrDelegate {
+  private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+      
+  private YarnConfiguration conf;
+  private GetNewApplicationResponse application;
+  private ApplicationId applicationId;
+  private YarnClient client;
+  private InetSocketAddress rmAddress;
+
+  /**
+   * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
+   * @param conf the configuration object.
+   */
+  public ResourceMgrDelegate(YarnConfiguration conf) {
+    super();
+    this.conf = conf;
+    client = YarnClient.createYarnClient();
+    client.init(conf);
+    this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_PORT);
+    client.start();
+  }
+
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    try {
+      return TypeConverter.fromYarnNodes(client.getNodeReports());
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    try {
+      Set<String> appTypes = new HashSet<String>(1);
+      appTypes.add(TezConfiguration.TEZ_APPLICATION_TYPE);
+      return TypeConverter.fromYarnApps(client.getApplications(appTypes),
+          this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    // TODO: Implement getBlacklistedTrackers
+    LOG.warn("getBlacklistedTrackers - Not implemented yet");
+    return new TaskTrackerInfo[0];
+  }
+
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    YarnClusterMetrics metrics;
+    try {
+      metrics = client.getYarnClusterMetrics();
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+    ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
+        metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
+        metrics.getNumNodeManagers(), 0, 0);
+    return oldMetrics;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Token getDelegationToken(Text renewer) throws IOException,
+      InterruptedException {
+    try {
+      // Remove rmAddress after YARN-868 is addressed
+      return ConverterUtils.convertFromYarn(
+        client.getRMDelegationToken(renewer), rmAddress);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return FileSystem.get(conf).getUri().toString();
+  }
+
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    try {
+      this.application = 
+          client.createApplication().getNewApplicationResponse();
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+    this.applicationId = this.application.getApplicationId();
+    return TypeConverter.fromYarn(applicationId);
+  }
+
+  public QueueInfo getQueue(String queueName) throws IOException,
+  InterruptedException {
+    try {
+      return TypeConverter.fromYarn(
+          client.getQueueInfo(queueName), this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    try {
+      return TypeConverter.fromYarnQueueUserAclsInfo(
+          client.getQueueAclsInfo());
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    try {
+      return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    try {
+      return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
+          this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public QueueInfo[] getChildQueues(String parent) throws IOException,
+      InterruptedException {
+    try {
+      return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
+        this.conf);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+//    Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+    String user = 
+      UserGroupInformation.getCurrentUser().getShortUserName();
+    Path path = MRApps.getStagingAreaDir(conf, user);
+    LOG.debug("getStagingAreaDir: dir=" + path);
+    return path.toString();
+  }
+
+
+  public String getSystemDir() throws IOException, InterruptedException {
+    Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+    //FileContext.getFileContext(conf).delete(sysDir, true);
+    return sysDir.toString();
+  }
+  
+
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return 0;
+  }
+  
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    return;
+  }
+
+
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return 0;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public void killApplication(ApplicationId appId)
+      throws YarnException, IOException {
+    client.killApplication(appId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
new file mode 100644
index 0000000..6b6dd35
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -0,0 +1,722 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN-TEZ.
+ */
+@SuppressWarnings({ "unchecked" })
+public class YARNRunner implements ClientProtocol {
+
+  private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+  private ResourceMgrDelegate resMgrDelegate;
+  private ClientCache clientCache;
+  private Configuration conf;
+  private final FileContext defaultFileContext;
+
+  final public static FsPermission DAG_FILE_PERMISSION =
+      FsPermission.createImmutable((short) 0644);
+  final public static int UTF8_CHUNK_SIZE = 16 * 1024;
+
+  private final TezConfiguration tezConf;
+  private final TezClient tezClient;
+  private DAGClient dagClient;
+
+  /**
+   * Yarn runner incapsulates the client interface of
+   * yarn
+   * @param conf the configuration object for the client
+   */
+  public YARNRunner(Configuration conf) {
+   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
+  }
+
+  /**
+   * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
+   * {@link ResourceMgrDelegate}. Enables mocking and testing.
+   * @param conf the configuration object for the client
+   * @param resMgrDelegate the resourcemanager client handle.
+   */
+  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+  }
+
+  /**
+   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+   * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+   * @param conf the configuration object
+   * @param resMgrDelegate the resource manager delegate
+   * @param clientCache the client cache object.
+   */
+  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+      ClientCache clientCache) {
+    this.conf = conf;
+    this.tezConf = new TezConfiguration(conf);
+    try {
+      this.resMgrDelegate = resMgrDelegate;
+      this.tezClient = new TezClient(tezConf);
+      this.clientCache = clientCache;
+      this.defaultFileContext = FileContext.getFileContext(this.conf);
+
+    } catch (UnsupportedFileSystemException ufe) {
+      throw new RuntimeException("Error in instantiating YarnClient", ufe);
+    }
+  }
+
+  @VisibleForTesting
+  @Private
+  /**
+   * Used for testing mostly.
+   * @param resMgrDelegate the resource manager delegate to set to.
+   */
+  public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
+    this.resMgrDelegate = resMgrDelegate;
+  }
+
+  @Override
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Use Token.renew instead");
+  }
+
+  @Override
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getActiveTrackers();
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    return resMgrDelegate.getAllJobs();
+  }
+
+  @Override
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getBlacklistedTrackers();
+  }
+
+  @Override
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getClusterMetrics();
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+      throws IOException, InterruptedException {
+    // The token is only used for serialization. So the type information
+    // mismatch should be fine.
+    return resMgrDelegate.getDelegationToken(renewer);
+  }
+
+  @Override
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return resMgrDelegate.getFilesystemName();
+  }
+
+  @Override
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    return resMgrDelegate.getNewJobID();
+  }
+
+  @Override
+  public QueueInfo getQueue(String queueName) throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getQueue(queueName);
+  }
+
+  @Override
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getQueueAclsForCurrentUser();
+  }
+
+  @Override
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    return resMgrDelegate.getQueues();
+  }
+
+  @Override
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    return resMgrDelegate.getRootQueues();
+  }
+
+  @Override
+  public QueueInfo[] getChildQueues(String parent) throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getChildQueues(parent);
+  }
+
+  @Override
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+    return resMgrDelegate.getStagingAreaDir();
+  }
+
+  @Override
+  public String getSystemDir() throws IOException, InterruptedException {
+    return resMgrDelegate.getSystemDir();
+  }
+
+  @Override
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getTaskTrackerExpiryInterval();
+  }
+
+  private Map<String, LocalResource> createJobLocalResources(
+      Configuration jobConf, String jobSubmitDir)
+      throws IOException {
+
+    // Setup LocalResources
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+
+    Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+    URL yarnUrlForJobSubmitDir = ConverterUtils
+        .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
+            .resolvePath(
+                defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+    LOG.debug("Creating setup context, jobSubmitDir url is "
+        + yarnUrlForJobSubmitDir);
+
+    localResources.put(MRJobConfig.JOB_CONF_FILE,
+        createApplicationResource(defaultFileContext,
+            jobConfPath, LocalResourceType.FILE));
+    if (jobConf.get(MRJobConfig.JAR) != null) {
+      Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+      LocalResource rc = createApplicationResource(defaultFileContext,
+          jobJarPath,
+          LocalResourceType.FILE);
+      // FIXME fix pattern support
+      // String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+      // JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+      // rc.setPattern(pattern);
+      localResources.put(MRJobConfig.JOB_JAR, rc);
+    } else {
+      // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+      // mapreduce jar itself which is already on the classpath.
+      LOG.info("Job jar is not present. "
+          + "Not adding any jar to the list of resources.");
+    }
+
+    // TODO gross hack
+    for (String s : new String[] {
+        MRJobConfig.JOB_SPLIT,
+        MRJobConfig.JOB_SPLIT_METAINFO}) {
+      localResources.put(s,
+          createApplicationResource(defaultFileContext,
+              new Path(jobSubmitDir, s), LocalResourceType.FILE));
+    }
+
+    MRApps.setupDistributedCache(jobConf, localResources);
+
+    return localResources;
+  }
+
+  // FIXME isn't this a nice mess of a client?
+  // read input, write splits, read splits again
+  private List<TaskLocationHint> getMapLocationHintsFromInputSplits(JobID jobId,
+      FileSystem fs, Configuration conf,
+      String jobSubmitDir) throws IOException {
+    TaskSplitMetaInfo[] splitsInfo =
+        SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
+            new Path(jobSubmitDir));
+    int splitsCount = splitsInfo.length;
+    List<TaskLocationHint> locationHints =
+        new ArrayList<TaskLocationHint>(splitsCount);
+    for (int i = 0; i < splitsCount; ++i) {
+      TaskLocationHint locationHint =
+          new TaskLocationHint(
+              new HashSet<String>(
+                  Arrays.asList(splitsInfo[i].getLocations())), null);
+      locationHints.add(locationHint);
+    }
+    return locationHints;
+  }
+
+  private void setupMapReduceEnv(Configuration jobConf,
+      Map<String, String> environment, boolean isMap) throws IOException {
+
+    if (isMap) {
+      warnForJavaLibPath(
+          jobConf.get(MRJobConfig.MAP_JAVA_OPTS,""),
+          "map",
+          MRJobConfig.MAP_JAVA_OPTS,
+          MRJobConfig.MAP_ENV);
+      warnForJavaLibPath(
+          jobConf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""),
+          "map",
+          MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
+          MRJobConfig.MAPRED_ADMIN_USER_ENV);
+    } else {
+      warnForJavaLibPath(
+          jobConf.get(MRJobConfig.REDUCE_JAVA_OPTS,""),
+          "reduce",
+          MRJobConfig.REDUCE_JAVA_OPTS,
+          MRJobConfig.REDUCE_ENV);
+      warnForJavaLibPath(
+          jobConf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""),
+          "reduce",
+          MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
+          MRJobConfig.MAPRED_ADMIN_USER_ENV);
+    }
+
+    MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
+  }
+
+  private Vertex createVertexForStage(Configuration stageConf,
+      Map<String, LocalResource> jobLocalResources,
+      List<TaskLocationHint> locations, int stageNum, int totalStages)
+      throws IOException {
+    // stageNum starts from 0, goes till numStages - 1
+    boolean isMap = false;
+    if (stageNum == 0) {
+      isMap = true;
+    }
+
+    int numTasks = isMap ? stageConf.getInt(MRJobConfig.NUM_MAPS, 0)
+        : stageConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    String processorName = isMap ? MapProcessor.class.getName()
+        : ReduceProcessor.class.getName();
+    String vertexName = null;
+    if (isMap) {
+      vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+    } else {
+      if (stageNum == totalStages - 1) {
+        vertexName = MultiStageMRConfigUtil.getFinalReduceVertexName();
+      } else {
+        vertexName = MultiStageMRConfigUtil
+            .getIntermediateStageVertexName(stageNum);
+      }
+    }
+
+    Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
+        : MRHelpers.getReduceResource(stageConf);
+    byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
+    Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
+        setUserPayload(vertexUserPayload),
+        numTasks, taskResource);
+    if (isMap) {
+      byte[] mapInputPayload = MRHelpers.createMRInputPayload(vertexUserPayload, null);
+      MRHelpers.addMRInput(vertex, mapInputPayload, null);
+    }
+    // Map only jobs.
+    if (stageNum == totalStages -1) {
+      MRHelpers.addMROutput(vertex, vertexUserPayload);
+    }
+
+    Map<String, String> taskEnv = new HashMap<String, String>();
+    setupMapReduceEnv(stageConf, taskEnv, isMap);
+
+    Map<String, LocalResource> taskLocalResources =
+        new TreeMap<String, LocalResource>();
+    // PRECOMMIT Remove split localization for reduce tasks if it's being set
+    // here
+    taskLocalResources.putAll(jobLocalResources);
+
+    String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
+        : MRHelpers.getReduceJavaOpts(stageConf);
+
+    vertex.setTaskEnvironment(taskEnv)
+        .setTaskLocalResources(taskLocalResources)
+        .setTaskLocationsHint(locations)
+        .setJavaOpts(taskJavaOpts);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding vertex to DAG" + ", vertexName="
+          + vertex.getVertexName() + ", processor="
+          + vertex.getProcessorDescriptor().getClassName() + ", parallelism="
+          + vertex.getParallelism() + ", javaOpts=" + vertex.getJavaOpts()
+          + ", resources=" + vertex.getTaskResource()
+      // TODO Add localResources and Environment
+      );
+    }
+
+    return vertex;
+  }
+
+  private DAG createDAG(FileSystem fs, JobID jobId, Configuration[] stageConfs,
+      String jobSubmitDir, Credentials ts,
+      Map<String, LocalResource> jobLocalResources) throws IOException {
+
+    String jobName = stageConfs[0].get(MRJobConfig.JOB_NAME,
+        YarnConfiguration.DEFAULT_APPLICATION_NAME);
+    DAG dag = new DAG(jobName);
+
+    LOG.info("Number of stages: " + stageConfs.length);
+
+    List<TaskLocationHint> mapInputLocations =
+        getMapLocationHintsFromInputSplits(
+            jobId, fs, stageConfs[0], jobSubmitDir);
+    List<TaskLocationHint> reduceInputLocations = null;
+
+    Vertex[] vertices = new Vertex[stageConfs.length];
+    for (int i = 0; i < stageConfs.length; i++) {
+      vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
+          i == 0 ? mapInputLocations : reduceInputLocations, i,
+          stageConfs.length);
+    }
+
+    for (int i = 0; i < vertices.length; i++) {
+      dag.addVertex(vertices[i]);
+      if (i > 0) {
+        EdgeProperty edgeProperty = new EdgeProperty(
+            DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+            SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(OnFileSortedOutput.class.getName()),
+            new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
+
+        Edge edge = null;
+        edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);
+        dag.addEdge(edge);
+      }
+
+    }
+    return dag;
+  }
+
+  private TezConfiguration getDAGAMConfFromMRConf() {
+    TezConfiguration finalConf = new TezConfiguration(this.tezConf);
+    Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
+        .getMRToDAGParamMap();
+
+    for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+      if (finalConf.get(entry.getKey()) != null) {
+        finalConf.set(entry.getValue(), finalConf.get(entry.getKey()));
+        finalConf.unset(entry.getKey());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
+              + " to Tez key: " + entry.getValue() + " with value "
+              + finalConf.get(entry.getValue()));
+        }
+      }
+    }
+    return finalConf;
+  }
+
+  @Override
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+  throws IOException, InterruptedException {
+
+    ApplicationId appId = resMgrDelegate.getApplicationId();
+
+    FileSystem fs = FileSystem.get(conf);
+    // Loads the job.xml written by the user.
+    JobConf jobConf = new JobConf(new TezConfiguration(conf));
+
+    // Extract individual raw MR configs.
+    Configuration[] stageConfs = MultiStageMRConfToTezTranslator
+        .getStageConfs(jobConf);
+
+    // Transform all confs to use Tez keys
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[0],
+        null);
+    for (int i = 1; i < stageConfs.length; i++) {
+      MultiStageMRConfToTezTranslator.translateVertexConfToTez(stageConfs[i],
+          stageConfs[i - 1]);
+    }
+
+    // create inputs to tezClient.submit()
+
+    // FIXME set up job resources
+    Map<String, LocalResource> jobLocalResources =
+        createJobLocalResources(stageConfs[0], jobSubmitDir);
+
+    // FIXME createDAG should take the tezConf as a parameter, instead of using
+    // MR keys.
+    DAG dag = createDAG(fs, jobId, stageConfs, jobSubmitDir, ts,
+        jobLocalResources);
+
+    List<String> vargs = new LinkedList<String>();
+    // admin command opts and user command opts
+    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+    warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
+        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
+    vargs.add(mrAppMasterAdminOptions);
+
+    // Add AM user command opts
+    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+    warnForJavaLibPath(mrAppMasterUserOptions, "app master",
+        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
+    vargs.add(mrAppMasterUserOptions);
+
+    StringBuilder javaOpts = new StringBuilder();
+    for (String varg : vargs) {
+      javaOpts.append(varg).append(" ");
+    }
+
+    // Setup the CLASSPATH in environment
+    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+    Map<String, String> environment = new HashMap<String, String>();
+
+    // Setup the environment variables for AM
+    MRHelpers.updateEnvironmentForMRAM(conf, environment);
+
+    TezConfiguration dagAMConf = getDAGAMConfFromMRConf();
+    dagAMConf.set(TezConfiguration.TEZ_AM_JAVA_OPTS, javaOpts.toString());
+
+    // Submit to ResourceManager
+    try {
+      dagAMConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+          jobSubmitDir);
+      AMConfiguration amConfig = new AMConfiguration(
+          jobConf.get(JobContext.QUEUE_NAME,
+              YarnConfiguration.DEFAULT_QUEUE_NAME),
+          environment,
+          jobLocalResources, dagAMConf, ts);
+      tezClient.submitDAGApplication(appId, dag, amConfig);
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
+
+    return getJobStatus(jobId);
+  }
+
+  private LocalResource createApplicationResource(FileContext fs, Path p,
+      LocalResourceType type) throws IOException {
+    LocalResource rsrc = Records.newRecord(LocalResource.class);
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+    rsrc.setSize(rsrcStat.getLen());
+    rsrc.setTimestamp(rsrcStat.getModificationTime());
+    rsrc.setType(type);
+    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    return rsrc;
+  }
+
+  @Override
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    resMgrDelegate.setJobPriority(arg0, arg1);
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return resMgrDelegate.getProtocolVersion(arg0, arg1);
+  }
+
+  @Override
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Use Token.renew instead");
+  }
+
+
+  @Override
+  public Counters getJobCounters(JobID arg0) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0).getJobCounters(arg0);
+  }
+
+  @Override
+  public String getJobHistoryDir() throws IOException, InterruptedException {
+    return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+  }
+
+  @Override
+  public JobStatus getJobStatus(JobID jobID) throws IOException,
+      InterruptedException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    String jobFile = MRApps.getJobFile(conf, user, jobID);
+    DAGStatus dagStatus;
+    try {
+      if(dagClient == null) {
+        dagClient = tezClient.getDAGClient(TypeConverter.toYarn(jobID).getAppId());
+      }
+      dagStatus = dagClient.getDAGStatus();
+      return new DAGJobStatus(dagClient.getApplicationReport(), dagStatus, jobFile);
+    } catch (TezException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+      int arg2) throws IOException, InterruptedException {
+    return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
+  }
+
+  @Override
+  public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
+  }
+
+  @Override
+  public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+  throws IOException, InterruptedException {
+    return clientCache.getClient(jobID)
+        .getTaskReports(jobID, taskType);
+  }
+
+  @Override
+  public void killJob(JobID arg0) throws IOException, InterruptedException {
+    /* check if the status is not running, if not send kill to RM */
+    JobStatus status = getJobStatus(arg0);
+    if (status.getState() == JobStatus.State.RUNNING ||
+        status.getState() == JobStatus.State.PREP) {
+      try {
+        resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+      } catch (YarnException e) {
+        throw new IOException(e);
+      }
+      return;
+    }
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
+  }
+
+  @Override
+  public AccessControlList getQueueAdmins(String arg0) throws IOException {
+    return new AccessControlList("*");
+  }
+
+  @Override
+  public JobTrackerStatus getJobTrackerStatus() throws IOException,
+      InterruptedException {
+    return JobTrackerStatus.RUNNING;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
+        clientMethodsHash);
+  }
+
+  @Override
+  public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+      throws IOException {
+    try {
+      return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private static void warnForJavaLibPath(String opts, String component,
+      String javaConf, String envConf) {
+    if (opts != null && opts.contains("-Djava.library.path")) {
+      LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
+               "programs to no longer function if hadoop native libraries " +
+               "are used. These values should be set as part of the " +
+               "LD_LIBRARY_PATH in the " + component + " JVM env using " +
+               envConf + " config settings.");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
new file mode 100644
index 0000000..c36dc9d
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YarnTezClientProtocolProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+
+public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
+
+  @Override
+  public ClientProtocol create(Configuration conf) throws IOException {
+    if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+      return new YARNRunner(conf);
+    }
+    return null;
+  }
+
+  @Override
+  public ClientProtocol create(InetSocketAddress addr, Configuration conf)
+      throws IOException {
+    return create(conf);
+  }
+
+  @Override
+  public void close(ClientProtocol clientProtocol) throws IOException {
+    // nothing to do
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..88816ca
--- /dev/null
+++ b/tez-mapreduce/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.tez.mapreduce.YarnTezClientProtocolProvider

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml
new file mode 100644
index 0000000..b3c247a
--- /dev/null
+++ b/tez-tests/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-tests</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce-examples</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
new file mode 100644
index 0000000..f98f392
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobs.java
@@ -0,0 +1,359 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.RandomTextWriterJob;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobs {
+
+  private static final Log LOG = LogFactory.getLog(TestMRRJobs.class);
+
+  protected static MiniTezCluster mrrTezCluster;
+  protected static MiniDFSCluster dfsCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+
+  private static String TEST_ROOT_DIR = "target"
+      + Path.SEPARATOR + TestMRRJobs.class.getName() + "-tmpDir";
+
+  private static final String OUTPUT_ROOT_DIR = "/tmp" + Path.SEPARATOR +
+      TestMRRJobs.class.getSimpleName();
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    if (mrrTezCluster == null) {
+      mrrTezCluster = new MiniTezCluster(TestMRRJobs.class.getName(), 1,
+          1, 1);
+      Configuration conf = new Configuration();
+      conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
+      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+      mrrTezCluster.init(conf);
+      mrrTezCluster.start();
+    }
+
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
+  @Test (timeout = 60000)
+  public void testMRRSleepJob() throws IOException, InterruptedException,
+      ClassNotFoundException {
+    LOG.info("\n\n\nStarting testMRRSleepJob().");
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO use dag client to test counters and task progress?
+    // what about completed jobs?
+  }
+
+  @Test (timeout = 60000)
+  public void testRandomWriter() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testRandomWriter().");
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
+    mrrTezCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
+    mrrTezCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
+    Job job = randomWriterJob.createJob(mrrTezCluster.getConfig());
+    Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
+    FileOutputFormat.setOutputPath(job, outputDir);
+    job.setSpeculativeExecution(false);
+    job.setJarByClass(RandomTextWriterJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+    // Make sure there are three files in the output-dir
+
+    RemoteIterator<FileStatus> iterator =
+        FileContext.getFileContext(mrrTezCluster.getConfig()).listStatus(
+            outputDir);
+    int count = 0;
+    while (iterator.hasNext()) {
+      FileStatus file = iterator.next();
+      if (!file.getPath().getName()
+          .equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
+        count++;
+      }
+    }
+    Assert.assertEquals("Number of part files is wrong!", 3, count);
+
+  }
+
+
+  @Test (timeout = 60000)
+  public void testFailingJob() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testFailingJob().");
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+    job.getConfiguration().setBoolean(MRRSleepJob.MAP_FATAL_ERROR, true);
+    job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "*");
+
+    job.submit();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertFalse(succeeded);
+    Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO verify failed task diagnostics
+  }
+
+  @Test (timeout = 60000)
+  public void testFailingAttempt() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testFailingAttempt().");
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 1, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(3); // speed up failures
+    job.getConfiguration().setBoolean(MRRSleepJob.MAP_THROW_ERROR, true);
+    job.getConfiguration().set(MRRSleepJob.MAP_ERROR_TASK_IDS, "0");
+
+    job.submit();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO verify failed task diagnostics
+  }
+
+  @Test (timeout = 60000)
+  public void testMRRSleepJobWithCompression() throws IOException,
+      InterruptedException, ClassNotFoundException {
+    LOG.info("\n\n\nStarting testMRRSleepJobWithCompression().");
+
+    if (!(new File(MiniTezCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniTezCluster.APPJAR
+               + " not found. Not running test.");
+      return;
+    }
+
+    Configuration sleepConf = new Configuration(mrrTezCluster.getConfig());
+
+    MRRSleepJob sleepJob = new MRRSleepJob();
+    sleepJob.setConf(sleepConf);
+
+    Job job = sleepJob.createJob(1, 1, 2, 1, 1,
+        1, 1, 1, 1, 1);
+
+    job.setJarByClass(MRRSleepJob.class);
+    job.setMaxMapAttempts(1); // speed up failures
+
+    // enable compression
+    job.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true);
+    job.getConfiguration().set(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+        DefaultCodec.class.getName());
+
+    job.submit();
+    String trackingUrl = job.getTrackingURL();
+    String jobId = job.getJobID().toString();
+    boolean succeeded = job.waitForCompletion(true);
+    Assert.assertTrue(succeeded);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+    Assert.assertTrue("Tracking URL was " + trackingUrl +
+                      " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+
+    // FIXME once counters and task progress can be obtained properly
+    // TODO use dag client to test counters and task progress?
+    // what about completed jobs?
+
+  }
+
+
+  /*
+  //@Test (timeout = 60000)
+  public void testMRRSleepJobWithSecurityOn() throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    LOG.info("\n\n\nStarting testMRRSleepJobWithSecurityOn().");
+
+    if (!(new File(MiniMRRTezCluster.APPJAR)).exists()) {
+      return;
+    }
+
+    mrrTezCluster.getConfig().set(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    mrrTezCluster.getConfig().set(YarnConfiguration.RM_KEYTAB, "/etc/krb5.keytab");
+    mrrTezCluster.getConfig().set(YarnConfiguration.NM_KEYTAB, "/etc/krb5.keytab");
+    mrrTezCluster.getConfig().set(YarnConfiguration.RM_PRINCIPAL,
+        "rm/sightbusy-lx@LOCALHOST");
+    mrrTezCluster.getConfig().set(YarnConfiguration.NM_PRINCIPAL,
+        "nm/sightbusy-lx@LOCALHOST");
+
+    UserGroupInformation.setConfiguration(mrrTezCluster.getConfig());
+
+    // Keep it in here instead of after RM/NM as multiple user logins happen in
+    // the same JVM.
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+    LOG.info("User name is " + user.getUserName());
+    for (Token<? extends TokenIdentifier> str : user.getTokens()) {
+      LOG.info("Token is " + str.encodeToUrlString());
+    }
+    user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        MRRSleepJob sleepJob = new MRRSleepJob();
+        sleepJob.setConf(mrrTezCluster.getConfig());
+        Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
+        // //Job with reduces
+        // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
+        job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
+        job.submit();
+        String trackingUrl = job.getTrackingURL();
+        String jobId = job.getJobID().toString();
+        job.waitForCompletion(true);
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+        Assert.assertTrue("Tracking URL was " + trackingUrl +
+                          " but didn't Match Job ID " + jobId ,
+          trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
+        return null;
+      }
+    });
+
+    // TODO later:  add explicit "isUber()" checks of some sort
+  }
+  */
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
new file mode 100644
index 0000000..1bc7b4d
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.DAGStatus.State;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
+import org.apache.tez.mapreduce.examples.MRRSleepJob;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepInputFormat;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepMapper;
+import org.apache.tez.mapreduce.examples.MRRSleepJob.SleepReducer;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.test.MiniTezCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRRJobsDAGApi {
+
+  private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
+
+  protected static MiniTezCluster mrrTezCluster;
+  protected static MiniDFSCluster dfsCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestMRRJobsDAGApi.class.getName() + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+          .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+    
+    if (mrrTezCluster == null) {
+      mrrTezCluster = new MiniTezCluster(TestMRRJobsDAGApi.class.getName(),
+          1, 1, 1);
+      Configuration conf = new Configuration();
+      conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+      mrrTezCluster.init(conf);
+      mrrTezCluster.start();
+    }
+
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+    // TODO Add cleanup code.
+  }
+
+  // Submits a simple 5 stage sleep job using the DAG submit API instead of job
+  // client.
+  @Test(timeout = 60000)
+  public void testMRRSleepJobDagSubmit() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(false, false, false, false);
+
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+
+  // Submits a simple 5 stage sleep job using the DAG submit API. Then kills it.
+  @Test(timeout = 60000)
+  public void testMRRSleepJobDagSubmitAndKill() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(false, true, false, false);
+
+    Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+
+  // Submits a DAG to AM via RPC after AM has started
+  @Test(timeout = 60000)
+  public void testMRRSleepJobViaSession() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false, false);
+
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+  }
+
+  // Submits a DAG to AM via RPC after AM has started
+  @Test(timeout = 120000)
+  public void testMultipleMRRSleepJobViaSession() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    Map<String, String> commonEnv = createCommonEnv();
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+        .valueOf(new Random().nextInt(100000))));
+    remoteFs.mkdirs(remoteStagingDir);
+    TezConfiguration tezConf = new TezConfiguration(
+        mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+
+    Map<String, LocalResource> amLocalResources =
+        new HashMap<String, LocalResource>();
+
+    AMConfiguration amConfig = new AMConfiguration(
+        "default", commonEnv, amLocalResources,
+        tezConf, null);
+    TezSessionConfiguration tezSessionConfig =
+        new TezSessionConfiguration(amConfig, tezConf);
+    TezSession tezSession = new TezSession("testsession", tezSessionConfig);
+    tezSession.start();
+    Assert.assertEquals(TezSessionStatus.INITIALIZING,
+        tezSession.getSessionStatus());
+
+    State finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession, false);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertEquals(TezSessionStatus.READY,
+        tezSession.getSessionStatus());
+    finalState = testMRRSleepJobDagSubmitCore(true, false, false,
+        tezSession, false);
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, finalState);
+    Assert.assertEquals(TezSessionStatus.READY,
+        tezSession.getSessionStatus());
+
+    ApplicationId appId = tezSession.getApplicationId();
+    tezSession.stop();
+    Assert.assertEquals(TezSessionStatus.SHUTDOWN,
+        tezSession.getSessionStatus());
+
+    YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(mrrTezCluster.getConfig());
+    yarnClient.start();
+
+    while (true) {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (appReport.getYarnApplicationState().equals(
+          YarnApplicationState.FINISHED)
+          || appReport.getYarnApplicationState().equals(
+              YarnApplicationState.FAILED)
+          || appReport.getYarnApplicationState().equals(
+              YarnApplicationState.KILLED)) {
+        break;
+      }
+    }
+
+    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+    Assert.assertEquals(YarnApplicationState.FINISHED,
+        appReport.getYarnApplicationState());
+    Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+        appReport.getFinalApplicationStatus());
+  }
+
+  // Submits a simple 5 stage sleep job using tez session. Then kills it.
+  @Test(timeout = 60000)
+  public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    State finalState = testMRRSleepJobDagSubmitCore(true, true, false, false);
+
+    Assert.assertEquals(DAGStatus.State.KILLED, finalState);
+    // TODO Add additional checks for tracking URL etc. - once it's exposed by
+    // the DAG API.
+  }
+
+  // Create and close a tez session without submitting a job
+  @Test(timeout = 60000)
+  public void testTezSessionShutdown() throws IOException,
+  InterruptedException, TezException, ClassNotFoundException, YarnException {
+    testMRRSleepJobDagSubmitCore(true, false, true, false);
+  }
+
+  @Test(timeout = 60000)
+  public void testAMSplitGeneration() throws IOException, InterruptedException,
+      TezException, ClassNotFoundException, YarnException {
+    testMRRSleepJobDagSubmitCore(true, false, false, true);
+  }
+
+  public State testMRRSleepJobDagSubmitCore(
+      boolean dagViaRPC,
+      boolean killDagWhileRunning,
+      boolean closeSessionBeforeSubmit,
+      boolean genSplitsInAM) throws IOException,
+      InterruptedException, TezException, ClassNotFoundException,
+      YarnException {
+    return testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning,
+        closeSessionBeforeSubmit, null, genSplitsInAM);
+  }
+
+  private Map<String, String> createCommonEnv() {
+    Map<String, String> commonEnv = new HashMap<String, String>();
+    return commonEnv;
+  }
+
+  public State testMRRSleepJobDagSubmitCore(
+      boolean dagViaRPC,
+      boolean killDagWhileRunning,
+      boolean closeSessionBeforeSubmit,
+      TezSession reUseTezSession,
+      boolean genSplitsInAM) throws IOException,
+      InterruptedException, TezException, ClassNotFoundException,
+      YarnException {
+    LOG.info("\n\n\nStarting testMRRSleepJobDagSubmit().");
+
+    JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
+    JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
+    JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
+
+    stage1Conf.setLong(MRRSleepJob.MAP_SLEEP_TIME, 1);
+    stage1Conf.setInt(MRRSleepJob.MAP_SLEEP_COUNT, 1);
+    stage1Conf.setInt(MRJobConfig.NUM_MAPS, 1);
+    stage1Conf.set(MRJobConfig.MAP_CLASS_ATTR, SleepMapper.class.getName());
+    stage1Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    stage1Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    stage1Conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
+        SleepInputFormat.class.getName());
+    stage1Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+        MRRSleepJobPartitioner.class.getName());
+
+    stage2Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
+    stage2Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
+    stage2Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    stage2Conf
+        .set(MRJobConfig.REDUCE_CLASS_ATTR, ISleepReducer.class.getName());
+    stage2Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    stage2Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    stage2Conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR,
+        MRRSleepJobPartitioner.class.getName());
+
+    JobConf stage22Conf = new JobConf(stage2Conf);
+    stage22Conf.setInt(MRJobConfig.NUM_REDUCES, 2);
+
+    stage3Conf.setLong(MRRSleepJob.REDUCE_SLEEP_TIME, 1);
+    stage3Conf.setInt(MRRSleepJob.REDUCE_SLEEP_COUNT, 1);
+    stage3Conf.setInt(MRJobConfig.NUM_REDUCES, 1);
+    stage3Conf.set(MRJobConfig.REDUCE_CLASS_ATTR, SleepReducer.class.getName());
+    stage3Conf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+        IntWritable.class.getName());
+    stage3Conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
+        IntWritable.class.getName());
+    stage3Conf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+        NullOutputFormat.class.getName());
+
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage1Conf, null);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage2Conf,
+        stage1Conf);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage22Conf,
+        stage1Conf);
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(stage3Conf,
+        stage2Conf); // this also works stage22 as it sets up keys etc
+
+    MRHelpers.doJobClientMagic(stage1Conf);
+    MRHelpers.doJobClientMagic(stage2Conf);
+    MRHelpers.doJobClientMagic(stage22Conf);
+    MRHelpers.doJobClientMagic(stage3Conf);
+
+    Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String
+        .valueOf(new Random().nextInt(100000))));
+    TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
+    InputSplitInfo inputSplitInfo = null;
+    if (!genSplitsInAM) {
+      inputSplitInfo = MRHelpers.generateInputSplits(stage1Conf,
+        remoteStagingDir);
+    }
+
+    byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    byte[] stage1InputPayload = MRHelpers.createMRInputPayload(stage1Payload, null);
+    byte[] stage3Payload = MRHelpers.createUserPayloadFromConf(stage3Conf);
+    
+    DAG dag = new DAG("testMRRSleepJobDagSubmit");
+    int stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
+    Class<? extends TezRootInputInitializer> inputInitializerClazz = genSplitsInAM ? MRInputAMSplitGenerator.class
+        : null;
+    Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(
+        MapProcessor.class.getName()).setUserPayload(stage1Payload),
+        stage1NumTasks, Resource.newInstance(256, 1));
+    MRHelpers.addMRInput(stage1Vertex, stage1InputPayload, inputInitializerClazz);
+    Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(
+        ReduceProcessor.class.getName()).setUserPayload(
+        MRHelpers.createUserPayloadFromConf(stage2Conf)),
+        1, Resource.newInstance(256, 1));
+    Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(
+        ReduceProcessor.class.getName()).setUserPayload(stage3Payload),
+        1, Resource.newInstance(256, 1));
+    MRHelpers.addMROutput(stage3Vertex, stage3Payload);
+
+    Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
+    Map<String, String> commonEnv = createCommonEnv();
+
+    if (!genSplitsInAM) {
+      // TODO Use utility method post TEZ-205.
+      Map<String, LocalResource> stage1LocalResources = new HashMap<String, LocalResource>();
+      stage1LocalResources.put(
+          inputSplitInfo.getSplitsFile().getName(),
+          createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(),
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      stage1LocalResources.put(
+          inputSplitInfo.getSplitsMetaInfoFile().getName(),
+          createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(),
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      stage1LocalResources.putAll(commonLocalResources);
+
+      stage1Vertex.setTaskLocalResources(stage1LocalResources);
+      stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
+    } else {
+      stage1Vertex.setTaskLocalResources(commonLocalResources);
+    }
+
+    stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts(stage1Conf));
+    stage1Vertex.setTaskEnvironment(commonEnv);
+
+    // TODO env, resources
+
+    stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage2Conf));
+    stage2Vertex.setTaskLocalResources(commonLocalResources);
+    stage2Vertex.setTaskEnvironment(commonEnv);
+
+    stage3Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(stage3Conf));
+    stage3Vertex.setTaskLocalResources(commonLocalResources);
+    stage3Vertex.setTaskEnvironment(commonEnv);
+
+    dag.addVertex(stage1Vertex);
+    dag.addVertex(stage2Vertex);
+    dag.addVertex(stage3Vertex);
+
+    Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, new OutputDescriptor(
+        OnFileSortedOutput.class.getName()), new InputDescriptor(
+                ShuffledMergedInputLegacy.class.getName())));
+    Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, new OutputDescriptor(
+        OnFileSortedOutput.class.getName()), new InputDescriptor(
+                ShuffledMergedInputLegacy.class.getName())));
+
+    dag.addEdge(edge1);
+    dag.addEdge(edge2);
+
+    Map<String, LocalResource> amLocalResources =
+        new HashMap<String, LocalResource>();
+    amLocalResources.putAll(commonLocalResources);
+
+    TezConfiguration tezConf = new TezConfiguration(
+            mrrTezCluster.getConfig());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
+        remoteStagingDir.toString());
+
+    TezClient tezClient = new TezClient(tezConf);
+    DAGClient dagClient = null;
+    TezSession tezSession = null;
+    boolean reuseSession = reUseTezSession != null;
+    TezSessionConfiguration tezSessionConfig;
+    AMConfiguration amConfig = new AMConfiguration(
+        "default", commonEnv, amLocalResources,
+        tezConf, null);
+    if(!dagViaRPC) {
+      // TODO Use utility method post TEZ-205 to figure out AM arguments etc.
+      dagClient = tezClient.submitDAGApplication(dag, amConfig);
+    } else {
+      if (reuseSession) {
+        tezSession = reUseTezSession;
+      } else {
+        tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+        tezSession = new TezSession("testsession", tezSessionConfig);
+        tezSession.start();
+      }
+    }
+
+    if (dagViaRPC && closeSessionBeforeSubmit) {
+      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(mrrTezCluster.getConfig());
+      yarnClient.start();
+      boolean sentKillSession = false;
+      while(true) {
+        Thread.sleep(500l);
+        ApplicationReport appReport =
+            yarnClient.getApplicationReport(tezSession.getApplicationId());
+        if (appReport == null) {
+          continue;
+        }
+        YarnApplicationState appState = appReport.getYarnApplicationState();
+        if (!sentKillSession) {
+          if (appState == YarnApplicationState.RUNNING) {
+            tezSession.stop();
+            sentKillSession = true;
+          }
+        } else {
+          if (appState == YarnApplicationState.FINISHED
+              || appState == YarnApplicationState.KILLED
+              || appState == YarnApplicationState.FAILED) {
+            LOG.info("Application completed after sending session shutdown"
+                + ", yarnApplicationState=" + appState
+                + ", finalAppStatus=" + appReport.getFinalApplicationStatus());
+            Assert.assertEquals(YarnApplicationState.FINISHED,
+                appState);
+            Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
+                appReport.getFinalApplicationStatus());
+            break;
+          }
+        }
+      }
+      yarnClient.stop();
+      return null;
+    }
+
+    if(dagViaRPC) {
+      LOG.info("Submitting dag to tez session with appId="
+          + tezSession.getApplicationId());
+      dagClient = tezSession.submitDAG(dag);
+      Assert.assertEquals(TezSessionStatus.RUNNING,
+          tezSession.getSessionStatus());
+    }
+    DAGStatus dagStatus = dagClient.getDAGStatus();
+    while (!dagStatus.isCompleted()) {
+      LOG.info("Waiting for job to complete. Sleeping for 500ms."
+          + " Current state: " + dagStatus.getState());
+      Thread.sleep(500l);
+      if(killDagWhileRunning
+          && dagStatus.getState() == DAGStatus.State.RUNNING) {
+        LOG.info("Killing running dag/session");
+        if (dagViaRPC) {
+          tezSession.stop();
+        } else {
+          dagClient.tryKillDAG();
+        }
+      }
+      dagStatus = dagClient.getDAGStatus();
+    }
+    if (dagViaRPC && !reuseSession) {
+      tezSession.stop();
+    }
+    return dagStatus.getState();
+  }
+
+  private static LocalResource createLocalResource(FileSystem fc, Path file,
+      LocalResourceType type, LocalResourceVisibility visibility)
+      throws IOException {
+    FileStatus fstat = fc.getFileStatus(file);
+    URL resourceURL = ConverterUtils.getYarnUrlFromPath(fc.resolvePath(fstat
+        .getPath()));
+    long resourceSize = fstat.getLen();
+    long resourceModificationTime = fstat.getModificationTime();
+
+    return LocalResource.newInstance(resourceURL, type, visibility,
+        resourceSize, resourceModificationTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
new file mode 100644
index 0000000..8926ed8
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -0,0 +1,172 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+/**
+ * Configures and starts the Tez-specific components in the YARN cluster.
+ *
+ * When using this mini cluster, the user is expected to
+ */
+public class MiniTezCluster extends MiniYARNCluster {
+
+  public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
+
+  private static final Log LOG = LogFactory.getLog(MiniTezCluster.class);
+
+  private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
+
+  private Path confFilePath;
+
+  public MiniTezCluster(String testName) {
+    this(testName, 1);
+  }
+
+  public MiniTezCluster(String testName, int noOfNMs) {
+    super(testName, noOfNMs, 4, 4);
+  }
+
+  public MiniTezCluster(String testName, int noOfNMs,
+      int numLocalDirs, int numLogDirs)  {
+    super(testName, noOfNMs, numLocalDirs, numLogDirs);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
+    }
+    
+    if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
+      // nothing defined. set quick delete value
+      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+    }
+
+    File appJarFile = new File(MiniTezCluster.APPJAR);
+
+    if (!appJarFile.exists()) {
+      String message = "TezAppJar " + MiniTezCluster.APPJAR
+          + " not found. Exiting.";
+      LOG.info(message);
+      throw new TezUncheckedException(message);
+    } else {
+      conf.set(TezConfiguration.TEZ_LIB_URIS, "file://" + appJarFile.getAbsolutePath());
+      LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
+    }
+
+    // VMEM monitoring disabled, PMEM monitoring enabled.
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+
+    try {
+      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+      if (fc.util().exists(stagingPath)) {
+        LOG.info(stagingPath + " exists! deleting...");
+        fc.delete(stagingPath, true);
+      }
+      LOG.info("mkdir: " + stagingPath);
+      fc.mkdir(stagingPath, null, true);
+
+      //mkdir done directory as well
+      String doneDir =
+          JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+      Path doneDirPath = fc.makeQualified(new Path(doneDir));
+      fc.mkdir(doneDirPath, null, true);
+    } catch (IOException e) {
+      throw new TezUncheckedException("Could not create staging directory. ", e);
+    }
+    conf.set(MRConfig.MASTER_ADDRESS, "test");
+
+    //configure the shuffle service in NM
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+        Service.class);
+
+    // Non-standard shuffle port
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+        DefaultContainerExecutor.class, ContainerExecutor.class);
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    super.serviceStart();
+    File workDir = super.getTestWorkDir();
+    Configuration conf = super.getConfig();
+
+    confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
+    File confFile = new File(confFilePath.toString());
+    try {
+      confFile.createNewFile();
+      conf.writeXml(new FileOutputStream(confFile));
+      confFile.deleteOnExit();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    confFilePath = new Path(confFile.getAbsolutePath());
+    conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        workDir.getAbsolutePath(), System.getProperty("java.class.path"));
+    LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
+        + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
+  }
+
+  public Path getConfigFilePath() {
+    return confFilePath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/360d30a9/tez-yarn-client/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-yarn-client/findbugs-exclude.xml b/tez-yarn-client/findbugs-exclude.xml
deleted file mode 100644
index 5b11308..0000000
--- a/tez-yarn-client/findbugs-exclude.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<FindBugsFilter>
-
-</FindBugsFilter>